View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.elasticsearch.index.rebuild;
23  
24  import static java.lang.String.format;
25  
26  import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
27  
28  import org.osgi.framework.BundleActivator;
29  import org.osgi.framework.BundleContext;
30  import org.osgi.framework.ServiceEvent;
31  import org.osgi.framework.ServiceListener;
32  import org.osgi.framework.ServiceReference;
33  import org.osgi.framework.ServiceRegistration;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  
37  import java.io.IOException;
38  import java.util.HashMap;
39  import java.util.Map;
40  import java.util.concurrent.ConcurrentHashMap;
41  
42  /**
43   * The bundle activator is defined in the pom.xml of this bundle.
44   */
45  public class IndexRebuildService implements BundleActivator {
46  
47    /*
48     * How starting and stopping this service works:
49     *
50     * The Index Rebuild can only be started when all services that feed data into the ElasticSearch index, called
51     * IndexProducers, are available via OSGI. To check for this, we use a service listener (see inner class at the
52     * bottom) that reacts whenever an IndexProducer becomes available or is no longer available. We keep these
53     * IndexProducers in an internal map.
54     *
55     * When our requirements - at least one IndexProducer of each type (defined by the Service enum below) available - are
56     * fulfilled, we register the IndexRebuildService with OSGI so it can be used. If our requirements are no longer
57     * fulfilled, we unregister it.
58     *
59     * We make this work by hooking into the OSGI lifecycle with the BundleActivator interface - this way we can start
60     * the listener in the beginning and make sure we properly shut down in the end.
61     */
62  
63    /**
64     * The services whose data is indexed by ElasticSearch.
65     * Attention: The order is relevant for the index rebuild and should not be changed!
66     */
67    public enum Service {
68      Themes, Series, Scheduler, AssetManager, Comments, Workflow, Search
69    }
70  
71    public enum DataType {
72      ALL, ACL
73    }
74  
75    public enum State {
76      PENDING, RUNNING, OK, ERROR
77    }
78  
79    private static final Logger logger = LoggerFactory.getLogger(IndexRebuildService.class);
80    private final Map<IndexRebuildService.Service, IndexProducer> indexProducers = new ConcurrentHashMap<>();
81    private ServiceRegistration<?> serviceRegistration = null;
82  
83    /**
84     * Called by OSGI when this bundle is started.
85     *
86     * @param bundleContext
87     *         The bundle context.
88     *
89     * @throws Exception
90     */
91    @Override
92    public void start(BundleContext bundleContext) throws Exception {
93      // check if there are already IndexProducers available
94      ServiceReference<?>[] serviceReferences = bundleContext.getAllServiceReferences(IndexProducer.class.getName(),
95              null);
96      if (serviceReferences != null) {
97        for (ServiceReference<?> serviceReference : serviceReferences) {
98          addIndexProducer((IndexProducer) bundleContext.getService(serviceReference), bundleContext);
99        }
100     }
101 
102     // Set rebuild service repopulation states to default values
103     setAllRebuildStates(IndexRebuildService.State.OK);
104 
105     // listen to changes in availability
106     bundleContext.addServiceListener(new IndexProducerListener(bundleContext),
107             "(objectClass=" + IndexProducer.class.getName() + ")");
108   }
109 
110   /**
111    * Called by OSGI when this bundle is stopped.
112    *
113    * @param bundleContext
114    *         The bundle context.
115    *
116    * @throws Exception
117    */
118   @Override
119   public void stop(BundleContext bundleContext) throws Exception {
120     // unregister this service from OSGI
121     unregisterIndexRebuildService();
122   }
123 
124   /**
125    * Get the service by its enum identifier.
126    *
127    * @param service
128    *         The enum identifying the service.
129    *
130    * @return the service
131    * @throws IllegalStateException
132    *         If the service is not registered.
133    */
134   public synchronized IndexProducer getIndexProducer(Service service) throws IllegalStateException {
135     if (!indexProducers.containsKey(service)) {
136       throw new IllegalStateException(format("Service %s is not available", service));
137     }
138     return indexProducers.get(service);
139   }
140 
141   /**
142    * Clear and rebuild the index from all services.
143    *
144    * @param index
145    *           The index to rebuild.
146    *
147    * @throws IOException
148    *           Thrown if the index cannot be cleared.
149    * @throws IndexRebuildException
150    *           Thrown if the index rebuild failed.
151    */
152   public synchronized void rebuildIndex(ElasticsearchIndex index) throws IOException, IndexRebuildException,
153           IllegalArgumentException {
154     index.clear();
155     logger.info("Index cleared, starting complete rebuild.");
156     setAllRebuildStates(IndexRebuildService.State.PENDING);
157     for (IndexRebuildService.Service service: IndexRebuildService.Service.values()) {
158       rebuildIndexInternal(getIndexProducer(service), DataType.ALL);
159     }
160   }
161 
162   /**
163    * Partially rebuild the index from a specific service and maybe only a specific data type.
164    *
165    * @param indexProducer
166    *           The service to re-index from.
167    * @param dataType
168    *           The data type to re-index.
169    * @throws IllegalArgumentException
170    *           Thrown if the service doesn't support the data type.
171    * @throws IndexRebuildException
172    *           Thrown if the index rebuild failed.
173    */
174   public synchronized void rebuildIndex(IndexProducer indexProducer, DataType dataType)
175           throws IllegalArgumentException, IndexRebuildException {
176     logger.info("Starting partial rebuild of the {} index.", indexProducer.getService());
177     setRebuildState(indexProducer.getService(), IndexRebuildService.State.PENDING);
178     rebuildIndexInternal(indexProducer, dataType);
179   }
180 
181   /**
182    * Start Index Rebuild from the specified service and then do all that follow. Can be used to resume a complete index
183    * rebuild that was interrupted.
184    *
185    * @param startingService
186    *           The {@link Service} to start with.
187    *
188    * @throws IllegalArgumentException
189    *           Thrown if the service doesn't exist.
190    * @throws IndexRebuildException
191    *           Thrown if the index rebuild failed.
192    */
193   public synchronized void resumeIndexRebuild(Service startingService)
194           throws IllegalArgumentException, IndexRebuildException {
195     logger.info("Resuming rebuild of {} index.", startingService);
196     setSubsetOfRebuildStates(startingService, IndexRebuildService.State.PENDING);
197     Service[] services = IndexRebuildService.Service.values();
198     for (int i = startingService.ordinal(); i < services.length; i++) {
199       rebuildIndexInternal(getIndexProducer(services[i]), DataType.ALL);
200     }
201   }
202 
203   private void rebuildIndexInternal(IndexProducer indexProducer, DataType dataType) throws IndexRebuildException,
204           IllegalArgumentException {
205     if (!indexProducer.dataTypeSupported(dataType)) {
206       throw new IllegalArgumentException("Service " + indexProducer.getService() + "doesn't support data type "
207               + dataType + " for index rebuild.");
208     }
209     Service service = indexProducer.getService();
210     logger.info("Starting to rebuild the {} index", service);
211     setRebuildState(service, IndexRebuildService.State.RUNNING);
212     try {
213       indexProducer.repopulate(dataType);
214       setRebuildState(service, IndexRebuildService.State.OK);
215     } catch (IndexRebuildException e) {
216       setRebuildState(service, IndexRebuildService.State.ERROR);
217       throw e;
218     }
219     logger.info("Finished rebuilding the {} index", service);
220   }
221 
222   /**
223    * Add IndexProducer service to internal map.
224    *
225    * @param indexProducer
226    *           The IndexProducer to add.
227    * @param bundleContext
228    *           The bundle context.
229    */
230   private void addIndexProducer(IndexProducer indexProducer, BundleContext bundleContext) {
231     // add only if there's not already a service of the same type in there
232     if (indexProducers.putIfAbsent(indexProducer.getService(), indexProducer) == null) {
233       logger.info("Service {} added.", indexProducer.getService());
234 
235       // all required IndexProducers found? Register this service at OSGI
236       if (indexProducers.size() == IndexRebuildService.Service.values().length) {
237         registerIndexRebuildService(bundleContext);
238       }
239     }
240   }
241 
242   /**
243    * Remove IndexProducer service from internal map.
244    *
245    * @param indexProducer
246    *           The IndexProducer to remove.
247    */
248   private void removeIndexProducer(IndexProducer indexProducer) {
249     // remove only if it's in there
250     if (indexProducers.remove(indexProducer.getService(), indexProducer)) {
251       logger.info("Service {} removed.", indexProducer.getService());
252 
253       // no longer all required IndexProducers available? Unregister this service from OSGI
254       if (indexProducers.size() != IndexRebuildService.Service.values().length) {
255         unregisterIndexRebuildService();
256       }
257     }
258   }
259 
260   /**
261    * Unregister this service from OSGI.
262    */
263   private synchronized void unregisterIndexRebuildService() {
264     // if this service is registered with OSGI, unregister it
265     if (serviceRegistration != null)  {
266       logger.info("Unregister IndexRebuildService.");
267       serviceRegistration.unregister();
268       serviceRegistration = null;
269     }
270   }
271 
272   /**
273    * Register this service at OSGI.
274    *
275    * @param bundleContext
276    *           The bundle context.
277    */
278   private synchronized void registerIndexRebuildService(BundleContext bundleContext) {
279     // if this service is not registered at OSGI, register it
280     if (serviceRegistration == null) {
281       logger.info("Register IndexRebuildService.");
282       serviceRegistration = bundleContext.registerService(this.getClass().getName(), IndexRebuildService.this, null);
283     }
284   }
285 
286   /**
287    * Listen to changes in the availability of IndexProducer services.
288    */
289   private final class IndexProducerListener implements ServiceListener {
290 
291     private final BundleContext bundleContext;
292 
293     /**
294      * Constructor to hand over the bundle context.
295      *
296      * @param bundleContext
297      *           The bundle context.
298      */
299     private IndexProducerListener(BundleContext bundleContext) {
300       this.bundleContext = bundleContext;
301     }
302 
303     @Override
304     public void serviceChanged(ServiceEvent serviceEvent) {
305       // new IndexProducer service available? Add to map
306       if (serviceEvent.getType() == ServiceEvent.REGISTERED) {
307         ServiceReference<?> serviceReference = serviceEvent.getServiceReference();
308         addIndexProducer((IndexProducer) bundleContext.getService(serviceReference), bundleContext);
309 
310         // Index Producer no longer available? Remove from map
311       } else if (serviceEvent.getType() == ServiceEvent.UNREGISTERING) {
312         ServiceReference<?> serviceReference = serviceEvent.getServiceReference();
313         removeIndexProducer((IndexProducer) bundleContext.getService(serviceReference));
314       }
315     }
316   }
317 
318   private final Map<Service, State> rebuildStates = new HashMap<>();
319 
320   /**
321    * @return All rebuild service repopulation states.
322    */
323   public Map<String, String> getRebuildStates() {
324     Map <String, String> statesAsString = new HashMap<>();
325     for (Map.Entry<IndexRebuildService.Service,IndexRebuildService.State> entry : rebuildStates.entrySet()) {
326       statesAsString.put(entry.getKey().toString(), entry.getValue().toString());
327     }
328     return statesAsString;
329   }
330 
331   /**
332    * Set all rebuild States.
333    *
334    * @param state
335    *           the state to be set
336    */
337   private void setAllRebuildStates(IndexRebuildService.State state) {
338     for (IndexRebuildService.Service service: IndexRebuildService.Service.values()) {
339       setRebuildState(service, state);
340     }
341   }
342 
343   /**
344    * Set a subset of rebuild States following the rebuild order.
345    *
346    * @param startingService
347    *           the service to start from
348    * @param state
349    *           the state to be set
350    */
351   private void setSubsetOfRebuildStates(IndexRebuildService.Service startingService, IndexRebuildService.State state) {
352     Service[] services = IndexRebuildService.Service.values();
353     for (int i = startingService.ordinal(); i < services.length; i++) {
354       rebuildStates.put(services[i], state);
355     }
356   }
357 
358   /**
359    * Set a single rebuild State.
360    *
361    * @param service
362    *           the service to be set
363    * @param state
364    *           the state to be set
365    */
366   private void setRebuildState(IndexRebuildService.Service service, IndexRebuildService.State state) {
367     rebuildStates.put(service, state);
368   }
369 }