IndexRebuildService.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.elasticsearch.index.rebuild;
import static java.lang.String.format;
import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceEvent;
import org.osgi.framework.ServiceListener;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* The bundle activator is defined in the pom.xml of this bundle.
*/
public class IndexRebuildService implements BundleActivator {
/*
* How starting and stopping this service works:
*
* The Index Rebuild can only be started when all services that feed data into the ElasticSearch index, called
* IndexProducers, are available via OSGI. To check for this, we use a service listener (see inner class at the
* bottom) that reacts whenever an IndexProducer becomes available or is no longer available. We keep these
* IndexProducers in an internal map.
*
* When our requirements - at least one IndexProducer of each type (defined by the Service enum below) available - are
* fulfilled, we register the IndexRebuildService with OSGI so it can be used. If our requirements are no longer
* fulfilled, we unregister it.
*
* We make this work by hooking into the OSGI lifecycle with the BundleActivator interface - this way we can start
* the listener in the beginning and make sure we properly shut down in the end.
*/
/**
* The services whose data is indexed by ElasticSearch.
* Attention: The order is relevant for the index rebuild and should not be changed!
*/
public enum Service {
Themes, Series, Scheduler, AssetManager, Comments, Workflow, Search
}
public enum DataType {
ALL, ACL
}
public enum State {
PENDING, RUNNING, OK, ERROR
}
private static final Logger logger = LoggerFactory.getLogger(IndexRebuildService.class);
private final Map<IndexRebuildService.Service, IndexProducer> indexProducers = new ConcurrentHashMap<>();
private ServiceRegistration<?> serviceRegistration = null;
/**
* Called by OSGI when this bundle is started.
*
* @param bundleContext
* The bundle context.
*
* @throws Exception
*/
@Override
public void start(BundleContext bundleContext) throws Exception {
// check if there are already IndexProducers available
ServiceReference<?>[] serviceReferences = bundleContext.getAllServiceReferences(IndexProducer.class.getName(),
null);
if (serviceReferences != null) {
for (ServiceReference<?> serviceReference : serviceReferences) {
addIndexProducer((IndexProducer) bundleContext.getService(serviceReference), bundleContext);
}
}
// Set rebuild service repopulation states to default values
setAllRebuildStates(IndexRebuildService.State.OK);
// listen to changes in availability
bundleContext.addServiceListener(new IndexProducerListener(bundleContext),
"(objectClass=" + IndexProducer.class.getName() + ")");
}
/**
* Called by OSGI when this bundle is stopped.
*
* @param bundleContext
* The bundle context.
*
* @throws Exception
*/
@Override
public void stop(BundleContext bundleContext) throws Exception {
// unregister this service from OSGI
unregisterIndexRebuildService();
}
/**
* Get the service by its enum identifier.
*
* @param service
* The enum identifying the service.
*
* @return the service
* @throws IllegalStateException
* If the service is not registered.
*/
public synchronized IndexProducer getIndexProducer(Service service) throws IllegalStateException {
if (!indexProducers.containsKey(service)) {
throw new IllegalStateException(format("Service %s is not available", service));
}
return indexProducers.get(service);
}
/**
* Clear and rebuild the index from all services.
*
* @param index
* The index to rebuild.
*
* @throws IOException
* Thrown if the index cannot be cleared.
* @throws IndexRebuildException
* Thrown if the index rebuild failed.
*/
public synchronized void rebuildIndex(ElasticsearchIndex index) throws IOException, IndexRebuildException,
IllegalArgumentException {
index.clear();
logger.info("Index cleared, starting complete rebuild.");
setAllRebuildStates(IndexRebuildService.State.PENDING);
for (IndexRebuildService.Service service: IndexRebuildService.Service.values()) {
rebuildIndexInternal(getIndexProducer(service), DataType.ALL);
}
}
/**
* Partially rebuild the index from a specific service and maybe only a specific data type.
*
* @param indexProducer
* The service to re-index from.
* @param dataType
* The data type to re-index.
* @throws IllegalArgumentException
* Thrown if the service doesn't support the data type.
* @throws IndexRebuildException
* Thrown if the index rebuild failed.
*/
public synchronized void rebuildIndex(IndexProducer indexProducer, DataType dataType)
throws IllegalArgumentException, IndexRebuildException {
logger.info("Starting partial rebuild of the {} index.", indexProducer.getService());
setRebuildState(indexProducer.getService(), IndexRebuildService.State.PENDING);
rebuildIndexInternal(indexProducer, dataType);
}
/**
* Start Index Rebuild from the specified service and then do all that follow. Can be used to resume a complete index
* rebuild that was interrupted.
*
* @param startingService
* The {@link Service} to start with.
*
* @throws IllegalArgumentException
* Thrown if the service doesn't exist.
* @throws IndexRebuildException
* Thrown if the index rebuild failed.
*/
public synchronized void resumeIndexRebuild(Service startingService)
throws IllegalArgumentException, IndexRebuildException {
logger.info("Resuming rebuild of {} index.", startingService);
setSubsetOfRebuildStates(startingService, IndexRebuildService.State.PENDING);
Service[] services = IndexRebuildService.Service.values();
for (int i = startingService.ordinal(); i < services.length; i++) {
rebuildIndexInternal(getIndexProducer(services[i]), DataType.ALL);
}
}
private void rebuildIndexInternal(IndexProducer indexProducer, DataType dataType) throws IndexRebuildException,
IllegalArgumentException {
if (!indexProducer.dataTypeSupported(dataType)) {
throw new IllegalArgumentException("Service " + indexProducer.getService() + "doesn't support data type "
+ dataType + " for index rebuild.");
}
Service service = indexProducer.getService();
logger.info("Starting to rebuild the {} index", service);
setRebuildState(service, IndexRebuildService.State.RUNNING);
try {
indexProducer.repopulate(dataType);
setRebuildState(service, IndexRebuildService.State.OK);
} catch (IndexRebuildException e) {
setRebuildState(service, IndexRebuildService.State.ERROR);
throw e;
}
logger.info("Finished rebuilding the {} index", service);
}
/**
* Add IndexProducer service to internal map.
*
* @param indexProducer
* The IndexProducer to add.
* @param bundleContext
* The bundle context.
*/
private void addIndexProducer(IndexProducer indexProducer, BundleContext bundleContext) {
// add only if there's not already a service of the same type in there
if (indexProducers.putIfAbsent(indexProducer.getService(), indexProducer) == null) {
logger.info("Service {} added.", indexProducer.getService());
// all required IndexProducers found? Register this service at OSGI
if (indexProducers.size() == IndexRebuildService.Service.values().length) {
registerIndexRebuildService(bundleContext);
}
}
}
/**
* Remove IndexProducer service from internal map.
*
* @param indexProducer
* The IndexProducer to remove.
*/
private void removeIndexProducer(IndexProducer indexProducer) {
// remove only if it's in there
if (indexProducers.remove(indexProducer.getService(), indexProducer)) {
logger.info("Service {} removed.", indexProducer.getService());
// no longer all required IndexProducers available? Unregister this service from OSGI
if (indexProducers.size() != IndexRebuildService.Service.values().length) {
unregisterIndexRebuildService();
}
}
}
/**
* Unregister this service from OSGI.
*/
private synchronized void unregisterIndexRebuildService() {
// if this service is registered with OSGI, unregister it
if (serviceRegistration != null) {
logger.info("Unregister IndexRebuildService.");
serviceRegistration.unregister();
serviceRegistration = null;
}
}
/**
* Register this service at OSGI.
*
* @param bundleContext
* The bundle context.
*/
private synchronized void registerIndexRebuildService(BundleContext bundleContext) {
// if this service is not registered at OSGI, register it
if (serviceRegistration == null) {
logger.info("Register IndexRebuildService.");
serviceRegistration = bundleContext.registerService(this.getClass().getName(), IndexRebuildService.this, null);
}
}
/**
* Listen to changes in the availability of IndexProducer services.
*/
private final class IndexProducerListener implements ServiceListener {
private final BundleContext bundleContext;
/**
* Constructor to hand over the bundle context.
*
* @param bundleContext
* The bundle context.
*/
private IndexProducerListener(BundleContext bundleContext) {
this.bundleContext = bundleContext;
}
@Override
public void serviceChanged(ServiceEvent serviceEvent) {
// new IndexProducer service available? Add to map
if (serviceEvent.getType() == ServiceEvent.REGISTERED) {
ServiceReference<?> serviceReference = serviceEvent.getServiceReference();
addIndexProducer((IndexProducer) bundleContext.getService(serviceReference), bundleContext);
// Index Producer no longer available? Remove from map
} else if (serviceEvent.getType() == ServiceEvent.UNREGISTERING) {
ServiceReference<?> serviceReference = serviceEvent.getServiceReference();
removeIndexProducer((IndexProducer) bundleContext.getService(serviceReference));
}
}
}
private final Map<Service, State> rebuildStates = new HashMap<>();
/**
* @return All rebuild service repopulation states.
*/
public Map<String, String> getRebuildStates() {
Map <String, String> statesAsString = new HashMap<>();
for (Map.Entry<IndexRebuildService.Service,IndexRebuildService.State> entry : rebuildStates.entrySet()) {
statesAsString.put(entry.getKey().toString(), entry.getValue().toString());
}
return statesAsString;
}
/**
* Set all rebuild States.
*
* @param state
* the state to be set
*/
private void setAllRebuildStates(IndexRebuildService.State state) {
for (IndexRebuildService.Service service: IndexRebuildService.Service.values()) {
setRebuildState(service, state);
}
}
/**
* Set a subset of rebuild States following the rebuild order.
*
* @param startingService
* the service to start from
* @param state
* the state to be set
*/
private void setSubsetOfRebuildStates(IndexRebuildService.Service startingService, IndexRebuildService.State state) {
Service[] services = IndexRebuildService.Service.values();
for (int i = startingService.ordinal(); i < services.length; i++) {
rebuildStates.put(services[i], state);
}
}
/**
* Set a single rebuild State.
*
* @param service
* the service to be set
* @param state
* the state to be set
*/
private void setRebuildState(IndexRebuildService.Service service, IndexRebuildService.State state) {
rebuildStates.put(service, state);
}
}