View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.db;
23  
24  import org.opencastproject.util.function.ThrowingConsumer;
25  import org.opencastproject.util.function.ThrowingFunction;
26  
27  import java.util.Random;
28  import java.util.function.Consumer;
29  import java.util.function.Function;
30  
31  import javax.persistence.EntityManager;
32  import javax.persistence.EntityManagerFactory;
33  import javax.persistence.EntityTransaction;
34  
35  public class DBSessionImpl implements DBSession {
36    private static final Random RAND = new Random();
37  
38    private EntityManagerFactory emf;
39    private int maxTransactionRetries = DBSessionFactoryImpl.DEFAULT_MAX_TRANSACTION_RETRIES;
40  
41    private final ThreadLocal<EntityManager> entityManagerStore = new ThreadLocal<>();
42  
43    public DBSessionImpl(EntityManagerFactory emf) {
44      this.emf = emf;
45    }
46  
47    @Override
48    public void exec(Consumer<EntityManager> fn) {
49      exec(em -> {
50        fn.accept(em);
51        return null;
52      });
53    }
54  
55    @Override
56    public <E extends Throwable> void execChecked(ThrowingConsumer<EntityManager, E> fn) throws E {
57      execChecked(em -> {
58        fn.accept(em);
59        return null;
60      });
61    }
62  
63    @Override
64    public <T> T exec(Function<EntityManager, T> fn) {
65      try {
66        return execChecked(fn::apply);
67      } catch (RuntimeException e) {
68        throw e;
69      } catch (Exception e) {
70        throw new RuntimeException(e);
71      }
72    }
73  
74    @Override
75    public <T, E extends Throwable> T execChecked(ThrowingFunction<EntityManager, T, E> fn) throws E {
76      EntityManager em = null;
77      try {
78        em = emf.createEntityManager();
79        return fn.apply(em);
80      } finally {
81        if (em != null && em.isOpen()) {
82          em.close();
83        }
84      }
85    }
86  
87    @Override
88    public void execTx(Consumer<EntityManager> fn) {
89      execTx(maxTransactionRetries, fn);
90    }
91  
92    @Override
93    public <E extends Throwable> void execTxChecked(ThrowingConsumer<EntityManager, E> fn) throws E {
94      execTxChecked(maxTransactionRetries, fn);
95    }
96  
97    @Override
98    public void execTx(int maxTransactionRetries, Consumer<EntityManager> fn) {
99      execTx(maxTransactionRetries, em -> {
100       fn.accept(em);
101       return null;
102     });
103   }
104 
105   @Override
106   public <E extends Throwable> void execTxChecked(int maxTransactionRetries, ThrowingConsumer<EntityManager, E> fn)
107           throws E {
108     execTxChecked(maxTransactionRetries, em -> {
109       fn.accept(em);
110       return null;
111     });
112   }
113 
114   @Override
115   public <T> T execTx(Function<EntityManager, T> fn) {
116     return execTx(maxTransactionRetries, fn);
117   }
118 
119   @Override
120   public <T, E extends Throwable> T execTxChecked(ThrowingFunction<EntityManager, T, E> fn) throws E {
121     return execTxChecked(maxTransactionRetries, fn);
122   }
123 
124   @Override
125   public <T> T execTx(int maxTransactionRetries, Function<EntityManager, T> fn) {
126     try {
127       return execTxChecked(maxTransactionRetries, fn::apply);
128     } catch (RuntimeException e) {
129       throw e;
130     } catch (Exception e) {
131       throw new RuntimeException(e);
132     }
133   }
134 
135   @Override
136   public <T, E extends Throwable> T execTxChecked(int maxTransactionRetries, ThrowingFunction<EntityManager, T, E> fn)
137           throws E {
138     EntityManager em = entityManagerStore.get();
139 
140     if (em != null) {
141       // We are already in a transaction. Opening another one can lead to deadlocks.
142       return fn.apply(em);
143     }
144 
145     EntityTransaction tx = null;
146     RuntimeException ex = null;
147 
148     for (int attempt = 0; attempt < maxTransactionRetries; attempt++) {
149       try {
150         em = emf.createEntityManager();
151         entityManagerStore.set(em);
152         tx = em.getTransaction();
153         tx.begin();
154         T res = fn.apply(em);
155         tx.commit();
156         return res;
157       } catch (RuntimeException e) { // we only catch RuntimeException as other exceptions are not related to DB errors
158         // TODO: do we need to catch all exceptions and look at the cause chain?
159         ex = e;
160 
161         if (tx != null && tx.isActive()) {
162           tx.rollback();
163         }
164 
165         // only retry if exception has something to do with the transaction
166         if (!DBUtils.isTransactionException(e)) {
167           throw e;
168         }
169       } finally {
170         if (em != null && em.isOpen()) {
171           em.close();
172         }
173         entityManagerStore.remove();
174       }
175 
176       // exponential backoff before next iteration
177       int sleepMillis = (int) (Math.pow(2, attempt) * 100) + RAND.nextInt(100);
178       try {
179         Thread.sleep(sleepMillis);
180       } catch (InterruptedException ignore) {
181       }
182     }
183 
184     // we only get here if all retries led to an exception: throw the last one up the stack
185     throw ex;
186   }
187 
188   @Override
189   public void close() {
190     if (emf.isOpen()) {
191       emf.close();
192     }
193   }
194 
195   public EntityManagerFactory getEntityManagerFactory() {
196     return emf;
197   }
198 
199   public void setEntityManagerFactory(EntityManagerFactory emf) {
200     this.emf = emf;
201   }
202 
203   public int getMaxTransactionRetries() {
204     return maxTransactionRetries;
205   }
206 
207   public void setMaxTransactionRetries(int maxTransactionRetries) {
208     this.maxTransactionRetries = maxTransactionRetries;
209   }
210 }