1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.db;
23
24 import org.opencastproject.util.function.ThrowingConsumer;
25 import org.opencastproject.util.function.ThrowingFunction;
26
27 import java.util.Random;
28 import java.util.function.Consumer;
29 import java.util.function.Function;
30
31 import javax.persistence.EntityManager;
32 import javax.persistence.EntityManagerFactory;
33 import javax.persistence.EntityTransaction;
34
35 public class DBSessionImpl implements DBSession {
36 private static final Random RAND = new Random();
37
38 private EntityManagerFactory emf;
39 private int maxTransactionRetries = DBSessionFactoryImpl.DEFAULT_MAX_TRANSACTION_RETRIES;
40
41 private final ThreadLocal<EntityManager> entityManagerStore = new ThreadLocal<>();
42
43 public DBSessionImpl(EntityManagerFactory emf) {
44 this.emf = emf;
45 }
46
47 @Override
48 public void exec(Consumer<EntityManager> fn) {
49 exec(em -> {
50 fn.accept(em);
51 return null;
52 });
53 }
54
55 @Override
56 public <E extends Throwable> void execChecked(ThrowingConsumer<EntityManager, E> fn) throws E {
57 execChecked(em -> {
58 fn.accept(em);
59 return null;
60 });
61 }
62
63 @Override
64 public <T> T exec(Function<EntityManager, T> fn) {
65 try {
66 return execChecked(fn::apply);
67 } catch (RuntimeException e) {
68 throw e;
69 } catch (Exception e) {
70 throw new RuntimeException(e);
71 }
72 }
73
74 @Override
75 public <T, E extends Throwable> T execChecked(ThrowingFunction<EntityManager, T, E> fn) throws E {
76 EntityManager em = null;
77 try {
78 em = emf.createEntityManager();
79 return fn.apply(em);
80 } finally {
81 if (em != null && em.isOpen()) {
82 em.close();
83 }
84 }
85 }
86
87 @Override
88 public void execTx(Consumer<EntityManager> fn) {
89 execTx(maxTransactionRetries, fn);
90 }
91
92 @Override
93 public <E extends Throwable> void execTxChecked(ThrowingConsumer<EntityManager, E> fn) throws E {
94 execTxChecked(maxTransactionRetries, fn);
95 }
96
97 @Override
98 public void execTx(int maxTransactionRetries, Consumer<EntityManager> fn) {
99 execTx(maxTransactionRetries, em -> {
100 fn.accept(em);
101 return null;
102 });
103 }
104
105 @Override
106 public <E extends Throwable> void execTxChecked(int maxTransactionRetries, ThrowingConsumer<EntityManager, E> fn)
107 throws E {
108 execTxChecked(maxTransactionRetries, em -> {
109 fn.accept(em);
110 return null;
111 });
112 }
113
114 @Override
115 public <T> T execTx(Function<EntityManager, T> fn) {
116 return execTx(maxTransactionRetries, fn);
117 }
118
119 @Override
120 public <T, E extends Throwable> T execTxChecked(ThrowingFunction<EntityManager, T, E> fn) throws E {
121 return execTxChecked(maxTransactionRetries, fn);
122 }
123
124 @Override
125 public <T> T execTx(int maxTransactionRetries, Function<EntityManager, T> fn) {
126 try {
127 return execTxChecked(maxTransactionRetries, fn::apply);
128 } catch (RuntimeException e) {
129 throw e;
130 } catch (Exception e) {
131 throw new RuntimeException(e);
132 }
133 }
134
135 @Override
136 public <T, E extends Throwable> T execTxChecked(int maxTransactionRetries, ThrowingFunction<EntityManager, T, E> fn)
137 throws E {
138 EntityManager em = entityManagerStore.get();
139
140 if (em != null) {
141
142 return fn.apply(em);
143 }
144
145 EntityTransaction tx = null;
146 RuntimeException ex = null;
147
148 for (int attempt = 0; attempt < maxTransactionRetries; attempt++) {
149 try {
150 em = emf.createEntityManager();
151 entityManagerStore.set(em);
152 tx = em.getTransaction();
153 tx.begin();
154 T res = fn.apply(em);
155 tx.commit();
156 return res;
157 } catch (RuntimeException e) {
158
159 ex = e;
160
161 if (tx != null && tx.isActive()) {
162 tx.rollback();
163 }
164
165
166 if (!DBUtils.isTransactionException(e)) {
167 throw e;
168 }
169 } finally {
170 if (em != null && em.isOpen()) {
171 em.close();
172 }
173 entityManagerStore.remove();
174 }
175
176
177 int sleepMillis = (int) (Math.pow(2, attempt) * 100) + RAND.nextInt(100);
178 try {
179 Thread.sleep(sleepMillis);
180 } catch (InterruptedException ignore) {
181 }
182 }
183
184
185 throw ex;
186 }
187
188 @Override
189 public void close() {
190 if (emf.isOpen()) {
191 emf.close();
192 }
193 }
194
195 public EntityManagerFactory getEntityManagerFactory() {
196 return emf;
197 }
198
199 public void setEntityManagerFactory(EntityManagerFactory emf) {
200 this.emf = emf;
201 }
202
203 public int getMaxTransactionRetries() {
204 return maxTransactionRetries;
205 }
206
207 public void setMaxTransactionRetries(int maxTransactionRetries) {
208 this.maxTransactionRetries = maxTransactionRetries;
209 }
210 }