IndexRebuildService.java

/*
 * Licensed to The Apereo Foundation under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional
 * information regarding copyright ownership.
 *
 *
 * The Apereo Foundation licenses this file to you under the Educational
 * Community License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License
 * at:
 *
 *   http://opensource.org/licenses/ecl2.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 *
 */

package org.opencastproject.elasticsearch.index.rebuild;

import static java.lang.String.format;

import org.opencastproject.elasticsearch.index.ElasticsearchIndex;

import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceEvent;
import org.osgi.framework.ServiceListener;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * The bundle activator is defined in the pom.xml of this bundle.
 */
public class IndexRebuildService implements BundleActivator {

  /*
   * How starting and stopping this service works:
   *
   * The Index Rebuild can only be started when all services that feed data into the ElasticSearch index, called
   * IndexProducers, are available via OSGI. To check for this, we use a service listener (see inner class at the
   * bottom) that reacts whenever an IndexProducer becomes available or is no longer available. We keep these
   * IndexProducers in an internal map.
   *
   * When our requirements - at least one IndexProducer of each type (defined by the Service enum below) available - are
   * fulfilled, we register the IndexRebuildService with OSGI so it can be used. If our requirements are no longer
   * fulfilled, we unregister it.
   *
   * We make this work by hooking into the OSGI lifecycle with the BundleActivator interface - this way we can start
   * the listener in the beginning and make sure we properly shut down in the end.
   */

  /**
   * The services whose data is indexed by ElasticSearch.
   * Attention: The order is relevant for the index rebuild and should not be changed!
   */
  public enum Service {
    Themes, Series, Scheduler, AssetManager, Comments, Workflow, Search
  }

  public enum DataType {
    ALL, ACL
  }

  public enum State {
    PENDING, RUNNING, OK, ERROR
  }

  private static final Logger logger = LoggerFactory.getLogger(IndexRebuildService.class);
  private final Map<IndexRebuildService.Service, IndexProducer> indexProducers = new ConcurrentHashMap<>();
  private ServiceRegistration<?> serviceRegistration = null;

  /**
   * Called by OSGI when this bundle is started.
   *
   * @param bundleContext
   *         The bundle context.
   *
   * @throws Exception
   */
  @Override
  public void start(BundleContext bundleContext) throws Exception {
    // check if there are already IndexProducers available
    ServiceReference<?>[] serviceReferences = bundleContext.getAllServiceReferences(IndexProducer.class.getName(),
            null);
    if (serviceReferences != null) {
      for (ServiceReference<?> serviceReference : serviceReferences) {
        addIndexProducer((IndexProducer) bundleContext.getService(serviceReference), bundleContext);
      }
    }

    // Set rebuild service repopulation states to default values
    setAllRebuildStates(IndexRebuildService.State.OK);

    // listen to changes in availability
    bundleContext.addServiceListener(new IndexProducerListener(bundleContext),
            "(objectClass=" + IndexProducer.class.getName() + ")");
  }

  /**
   * Called by OSGI when this bundle is stopped.
   *
   * @param bundleContext
   *         The bundle context.
   *
   * @throws Exception
   */
  @Override
  public void stop(BundleContext bundleContext) throws Exception {
    // unregister this service from OSGI
    unregisterIndexRebuildService();
  }

  /**
   * Get the service by its enum identifier.
   *
   * @param service
   *         The enum identifying the service.
   *
   * @return the service
   * @throws IllegalStateException
   *         If the service is not registered.
   */
  public synchronized IndexProducer getIndexProducer(Service service) throws IllegalStateException {
    if (!indexProducers.containsKey(service)) {
      throw new IllegalStateException(format("Service %s is not available", service));
    }
    return indexProducers.get(service);
  }

  /**
   * Clear and rebuild the index from all services.
   *
   * @param index
   *           The index to rebuild.
   *
   * @throws IOException
   *           Thrown if the index cannot be cleared.
   * @throws IndexRebuildException
   *           Thrown if the index rebuild failed.
   */
  public synchronized void rebuildIndex(ElasticsearchIndex index) throws IOException, IndexRebuildException,
          IllegalArgumentException {
    index.clear();
    logger.info("Index cleared, starting complete rebuild.");
    setAllRebuildStates(IndexRebuildService.State.PENDING);
    for (IndexRebuildService.Service service: IndexRebuildService.Service.values()) {
      rebuildIndexInternal(getIndexProducer(service), DataType.ALL);
    }
  }

  /**
   * Partially rebuild the index from a specific service and maybe only a specific data type.
   *
   * @param indexProducer
   *           The service to re-index from.
   * @param dataType
   *           The data type to re-index.
   * @throws IllegalArgumentException
   *           Thrown if the service doesn't support the data type.
   * @throws IndexRebuildException
   *           Thrown if the index rebuild failed.
   */
  public synchronized void rebuildIndex(IndexProducer indexProducer, DataType dataType)
          throws IllegalArgumentException, IndexRebuildException {
    logger.info("Starting partial rebuild of the {} index.", indexProducer.getService());
    setRebuildState(indexProducer.getService(), IndexRebuildService.State.PENDING);
    rebuildIndexInternal(indexProducer, dataType);
  }

  /**
   * Start Index Rebuild from the specified service and then do all that follow. Can be used to resume a complete index
   * rebuild that was interrupted.
   *
   * @param startingService
   *           The {@link Service} to start with.
   *
   * @throws IllegalArgumentException
   *           Thrown if the service doesn't exist.
   * @throws IndexRebuildException
   *           Thrown if the index rebuild failed.
   */
  public synchronized void resumeIndexRebuild(Service startingService)
          throws IllegalArgumentException, IndexRebuildException {
    logger.info("Resuming rebuild of {} index.", startingService);
    setSubsetOfRebuildStates(startingService, IndexRebuildService.State.PENDING);
    Service[] services = IndexRebuildService.Service.values();
    for (int i = startingService.ordinal(); i < services.length; i++) {
      rebuildIndexInternal(getIndexProducer(services[i]), DataType.ALL);
    }
  }

  private void rebuildIndexInternal(IndexProducer indexProducer, DataType dataType) throws IndexRebuildException,
          IllegalArgumentException {
    if (!indexProducer.dataTypeSupported(dataType)) {
      throw new IllegalArgumentException("Service " + indexProducer.getService() + "doesn't support data type "
              + dataType + " for index rebuild.");
    }
    Service service = indexProducer.getService();
    logger.info("Starting to rebuild the {} index", service);
    setRebuildState(service, IndexRebuildService.State.RUNNING);
    try {
      indexProducer.repopulate(dataType);
      setRebuildState(service, IndexRebuildService.State.OK);
    } catch (IndexRebuildException e) {
      setRebuildState(service, IndexRebuildService.State.ERROR);
      throw e;
    }
    logger.info("Finished rebuilding the {} index", service);
  }

  /**
   * Add IndexProducer service to internal map.
   *
   * @param indexProducer
   *           The IndexProducer to add.
   * @param bundleContext
   *           The bundle context.
   */
  private void addIndexProducer(IndexProducer indexProducer, BundleContext bundleContext) {
    // add only if there's not already a service of the same type in there
    if (indexProducers.putIfAbsent(indexProducer.getService(), indexProducer) == null) {
      logger.info("Service {} added.", indexProducer.getService());

      // all required IndexProducers found? Register this service at OSGI
      if (indexProducers.size() == IndexRebuildService.Service.values().length) {
        registerIndexRebuildService(bundleContext);
      }
    }
  }

  /**
   * Remove IndexProducer service from internal map.
   *
   * @param indexProducer
   *           The IndexProducer to remove.
   */
  private void removeIndexProducer(IndexProducer indexProducer) {
    // remove only if it's in there
    if (indexProducers.remove(indexProducer.getService(), indexProducer)) {
      logger.info("Service {} removed.", indexProducer.getService());

      // no longer all required IndexProducers available? Unregister this service from OSGI
      if (indexProducers.size() != IndexRebuildService.Service.values().length) {
        unregisterIndexRebuildService();
      }
    }
  }

  /**
   * Unregister this service from OSGI.
   */
  private synchronized void unregisterIndexRebuildService() {
    // if this service is registered with OSGI, unregister it
    if (serviceRegistration != null)  {
      logger.info("Unregister IndexRebuildService.");
      serviceRegistration.unregister();
      serviceRegistration = null;
    }
  }

  /**
   * Register this service at OSGI.
   *
   * @param bundleContext
   *           The bundle context.
   */
  private synchronized void registerIndexRebuildService(BundleContext bundleContext) {
    // if this service is not registered at OSGI, register it
    if (serviceRegistration == null) {
      logger.info("Register IndexRebuildService.");
      serviceRegistration = bundleContext.registerService(this.getClass().getName(), IndexRebuildService.this, null);
    }
  }

  /**
   * Listen to changes in the availability of IndexProducer services.
   */
  private final class IndexProducerListener implements ServiceListener {

    private final BundleContext bundleContext;

    /**
     * Constructor to hand over the bundle context.
     *
     * @param bundleContext
     *           The bundle context.
     */
    private IndexProducerListener(BundleContext bundleContext) {
      this.bundleContext = bundleContext;
    }

    @Override
    public void serviceChanged(ServiceEvent serviceEvent) {
      // new IndexProducer service available? Add to map
      if (serviceEvent.getType() == ServiceEvent.REGISTERED) {
        ServiceReference<?> serviceReference = serviceEvent.getServiceReference();
        addIndexProducer((IndexProducer) bundleContext.getService(serviceReference), bundleContext);

        // Index Producer no longer available? Remove from map
      } else if (serviceEvent.getType() == ServiceEvent.UNREGISTERING) {
        ServiceReference<?> serviceReference = serviceEvent.getServiceReference();
        removeIndexProducer((IndexProducer) bundleContext.getService(serviceReference));
      }
    }
  }

  private final Map<Service, State> rebuildStates = new HashMap<>();

  /**
   * @return All rebuild service repopulation states.
   */
  public Map<String, String> getRebuildStates() {
    Map <String, String> statesAsString = new HashMap<>();
    for (Map.Entry<IndexRebuildService.Service,IndexRebuildService.State> entry : rebuildStates.entrySet()) {
      statesAsString.put(entry.getKey().toString(), entry.getValue().toString());
    }
    return statesAsString;
  }

  /**
   * Set all rebuild States.
   *
   * @param state
   *           the state to be set
   */
  private void setAllRebuildStates(IndexRebuildService.State state) {
    for (IndexRebuildService.Service service: IndexRebuildService.Service.values()) {
      setRebuildState(service, state);
    }
  }

  /**
   * Set a subset of rebuild States following the rebuild order.
   *
   * @param startingService
   *           the service to start from
   * @param state
   *           the state to be set
   */
  private void setSubsetOfRebuildStates(IndexRebuildService.Service startingService, IndexRebuildService.State state) {
    Service[] services = IndexRebuildService.Service.values();
    for (int i = startingService.ordinal(); i < services.length; i++) {
      rebuildStates.put(services[i], state);
    }
  }

  /**
   * Set a single rebuild State.
   *
   * @param service
   *           the service to be set
   * @param state
   *           the state to be set
   */
  private void setRebuildState(IndexRebuildService.Service service, IndexRebuildService.State state) {
    rebuildStates.put(service, state);
  }
}