View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.db;
23  
24  import org.opencastproject.util.function.ThrowingConsumer;
25  import org.opencastproject.util.function.ThrowingFunction;
26  
27  import java.util.Random;
28  import java.util.function.Consumer;
29  import java.util.function.Function;
30  
31  import javax.persistence.EntityManager;
32  import javax.persistence.EntityManagerFactory;
33  import javax.persistence.EntityTransaction;
34  
35  public class DBSessionImpl implements DBSession {
36    private static final Random RAND = new Random();
37  
38    private EntityManagerFactory emf;
39    private int maxTransactionRetries = DBSessionFactoryImpl.DEFAULT_MAX_TRANSACTION_RETRIES;
40  
41    private final ThreadLocal<EntityManager> entityManagerStore = new ThreadLocal<>();
42  
43    public DBSessionImpl(EntityManagerFactory emf) {
44      this.emf = emf;
45    }
46  
47    @Override
48    public void exec(Consumer<EntityManager> fn) {
49      exec(em -> {
50        fn.accept(em);
51        return null;
52      });
53    }
54  
55    @Override
56    public <E extends Throwable> void execChecked(ThrowingConsumer<EntityManager, E> fn) throws E {
57      execChecked(em -> {
58        fn.accept(em);
59        return null;
60      });
61    }
62  
63    @Override
64    public <T> T exec(Function<EntityManager, T> fn) {
65      try {
66        return execChecked(fn::apply);
67      } catch (RuntimeException e) {
68        throw e;
69      } catch (Exception e) {
70        throw new RuntimeException(e);
71      }
72    }
73  
74    @Override
75    public <T, E extends Throwable> T execChecked(ThrowingFunction<EntityManager, T, E> fn) throws E {
76      EntityManager em = null;
77      try {
78        em = emf.createEntityManager();
79        return fn.apply(em);
80      } finally {
81        if (em != null && em.isOpen()) {
82          em.close();
83        }
84      }
85    }
86  
87    @Override
88    public void execTx(Consumer<EntityManager> fn) {
89      execTx(maxTransactionRetries, fn);
90    }
91  
92    @Override
93    public <E extends Throwable> void execTxChecked(ThrowingConsumer<EntityManager, E> fn) throws E {
94      execTxChecked(maxTransactionRetries, fn);
95    }
96  
97    @Override
98    public void execTx(int maxTransactionRetries, Consumer<EntityManager> fn) {
99      execTx(maxTransactionRetries, em -> {
100       fn.accept(em);
101       return null;
102     });
103   }
104 
105   @Override
106   public <E extends Throwable> void execTxChecked(int maxTransactionRetries, ThrowingConsumer<EntityManager, E> fn) throws E {
107     execTxChecked(maxTransactionRetries, em -> {
108       fn.accept(em);
109       return null;
110     });
111   }
112 
113   @Override
114   public <T> T execTx(Function<EntityManager, T> fn) {
115     return execTx(maxTransactionRetries, fn);
116   }
117 
118   @Override
119   public <T, E extends Throwable> T execTxChecked(ThrowingFunction<EntityManager, T, E> fn) throws E {
120     return execTxChecked(maxTransactionRetries, fn);
121   }
122 
123   @Override
124   public <T> T execTx(int maxTransactionRetries, Function<EntityManager, T> fn) {
125     try {
126       return execTxChecked(maxTransactionRetries, fn::apply);
127     } catch (RuntimeException e) {
128       throw e;
129     } catch (Exception e) {
130       throw new RuntimeException(e);
131     }
132   }
133 
134   @Override
135   public <T, E extends Throwable> T execTxChecked(int maxTransactionRetries, ThrowingFunction<EntityManager, T, E> fn) throws E {
136     EntityManager em = entityManagerStore.get();
137 
138     if (em != null) {
139       // We are already in a transaction. Opening another one can lead to deadlocks.
140       return fn.apply(em);
141     }
142 
143     EntityTransaction tx = null;
144     RuntimeException ex = null;
145 
146     for (int attempt = 0; attempt < maxTransactionRetries; attempt++) {
147       try {
148         em = emf.createEntityManager();
149         entityManagerStore.set(em);
150         tx = em.getTransaction();
151         tx.begin();
152         T res = fn.apply(em);
153         tx.commit();
154         return res;
155       } catch (RuntimeException e) { // we only catch RuntimeException as other exceptions are not related to DB errors
156         // TODO: do we need to catch all exceptions and look at the cause chain?
157         ex = e;
158 
159         if (tx != null && tx.isActive()) {
160           tx.rollback();
161         }
162 
163         // only retry if exception has something to do with the transaction
164         if (!DBUtils.isTransactionException(e)) {
165           throw e;
166         }
167       } finally {
168         if (em != null && em.isOpen()) {
169           em.close();
170         }
171         entityManagerStore.remove();
172       }
173 
174       // exponential backoff before next iteration
175       int sleepMillis = (int) (Math.pow(2, attempt) * 100) + RAND.nextInt(100);
176       try {
177         Thread.sleep(sleepMillis);
178       } catch (InterruptedException ignore) {
179       }
180     }
181 
182     // we only get here if all retries led to an exception: throw the last one up the stack
183     throw ex;
184   }
185 
186   @Override
187   public void close() {
188     if (emf.isOpen()) {
189       emf.close();
190     }
191   }
192 
193   public EntityManagerFactory getEntityManagerFactory() {
194     return emf;
195   }
196 
197   public void setEntityManagerFactory(EntityManagerFactory emf) {
198     this.emf = emf;
199   }
200 
201   public int getMaxTransactionRetries() {
202     return maxTransactionRetries;
203   }
204 
205   public void setMaxTransactionRetries(int maxTransactionRetries) {
206     this.maxTransactionRetries = maxTransactionRetries;
207   }
208 }