1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.db;
23
24 import org.opencastproject.util.function.ThrowingConsumer;
25 import org.opencastproject.util.function.ThrowingFunction;
26
27 import java.util.Random;
28 import java.util.function.Consumer;
29 import java.util.function.Function;
30
31 import javax.persistence.EntityManager;
32 import javax.persistence.EntityManagerFactory;
33 import javax.persistence.EntityTransaction;
34
35 public class DBSessionImpl implements DBSession {
36 private static final Random RAND = new Random();
37
38 private EntityManagerFactory emf;
39 private int maxTransactionRetries = DBSessionFactoryImpl.DEFAULT_MAX_TRANSACTION_RETRIES;
40
41 private final ThreadLocal<EntityManager> entityManagerStore = new ThreadLocal<>();
42
43 public DBSessionImpl(EntityManagerFactory emf) {
44 this.emf = emf;
45 }
46
47 @Override
48 public void exec(Consumer<EntityManager> fn) {
49 exec(em -> {
50 fn.accept(em);
51 return null;
52 });
53 }
54
55 @Override
56 public <E extends Throwable> void execChecked(ThrowingConsumer<EntityManager, E> fn) throws E {
57 execChecked(em -> {
58 fn.accept(em);
59 return null;
60 });
61 }
62
63 @Override
64 public <T> T exec(Function<EntityManager, T> fn) {
65 try {
66 return execChecked(fn::apply);
67 } catch (RuntimeException e) {
68 throw e;
69 } catch (Exception e) {
70 throw new RuntimeException(e);
71 }
72 }
73
74 @Override
75 public <T, E extends Throwable> T execChecked(ThrowingFunction<EntityManager, T, E> fn) throws E {
76 EntityManager em = null;
77 try {
78 em = emf.createEntityManager();
79 return fn.apply(em);
80 } finally {
81 if (em != null && em.isOpen()) {
82 em.close();
83 }
84 }
85 }
86
87 @Override
88 public void execTx(Consumer<EntityManager> fn) {
89 execTx(maxTransactionRetries, fn);
90 }
91
92 @Override
93 public <E extends Throwable> void execTxChecked(ThrowingConsumer<EntityManager, E> fn) throws E {
94 execTxChecked(maxTransactionRetries, fn);
95 }
96
97 @Override
98 public void execTx(int maxTransactionRetries, Consumer<EntityManager> fn) {
99 execTx(maxTransactionRetries, em -> {
100 fn.accept(em);
101 return null;
102 });
103 }
104
105 @Override
106 public <E extends Throwable> void execTxChecked(int maxTransactionRetries, ThrowingConsumer<EntityManager, E> fn) throws E {
107 execTxChecked(maxTransactionRetries, em -> {
108 fn.accept(em);
109 return null;
110 });
111 }
112
113 @Override
114 public <T> T execTx(Function<EntityManager, T> fn) {
115 return execTx(maxTransactionRetries, fn);
116 }
117
118 @Override
119 public <T, E extends Throwable> T execTxChecked(ThrowingFunction<EntityManager, T, E> fn) throws E {
120 return execTxChecked(maxTransactionRetries, fn);
121 }
122
123 @Override
124 public <T> T execTx(int maxTransactionRetries, Function<EntityManager, T> fn) {
125 try {
126 return execTxChecked(maxTransactionRetries, fn::apply);
127 } catch (RuntimeException e) {
128 throw e;
129 } catch (Exception e) {
130 throw new RuntimeException(e);
131 }
132 }
133
134 @Override
135 public <T, E extends Throwable> T execTxChecked(int maxTransactionRetries, ThrowingFunction<EntityManager, T, E> fn) throws E {
136 EntityManager em = entityManagerStore.get();
137
138 if (em != null) {
139
140 return fn.apply(em);
141 }
142
143 EntityTransaction tx = null;
144 RuntimeException ex = null;
145
146 for (int attempt = 0; attempt < maxTransactionRetries; attempt++) {
147 try {
148 em = emf.createEntityManager();
149 entityManagerStore.set(em);
150 tx = em.getTransaction();
151 tx.begin();
152 T res = fn.apply(em);
153 tx.commit();
154 return res;
155 } catch (RuntimeException e) {
156
157 ex = e;
158
159 if (tx != null && tx.isActive()) {
160 tx.rollback();
161 }
162
163
164 if (!DBUtils.isTransactionException(e)) {
165 throw e;
166 }
167 } finally {
168 if (em != null && em.isOpen()) {
169 em.close();
170 }
171 entityManagerStore.remove();
172 }
173
174
175 int sleepMillis = (int) (Math.pow(2, attempt) * 100) + RAND.nextInt(100);
176 try {
177 Thread.sleep(sleepMillis);
178 } catch (InterruptedException ignore) {
179 }
180 }
181
182
183 throw ex;
184 }
185
186 @Override
187 public void close() {
188 if (emf.isOpen()) {
189 emf.close();
190 }
191 }
192
193 public EntityManagerFactory getEntityManagerFactory() {
194 return emf;
195 }
196
197 public void setEntityManagerFactory(EntityManagerFactory emf) {
198 this.emf = emf;
199 }
200
201 public int getMaxTransactionRetries() {
202 return maxTransactionRetries;
203 }
204
205 public void setMaxTransactionRetries(int maxTransactionRetries) {
206 this.maxTransactionRetries = maxTransactionRetries;
207 }
208 }