AbstractWorkflowOperationHandler.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.workflow.api;
import static java.lang.String.format;
import org.opencastproject.job.api.Job;
import org.opencastproject.job.api.JobBarrier;
import org.opencastproject.job.api.JobContext;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElement;
import org.opencastproject.mediapackage.MediaPackageElementFlavor;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.osgi.framework.Constants;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* Abstract base implementation for an operation handler, which implements a simple start operation that returns a
* {@link WorkflowOperationResult} with the current mediapackage and {@link Action#CONTINUE}.
*/
public abstract class AbstractWorkflowOperationHandler implements WorkflowOperationHandler {
/** The logging facility */
private static final Logger logger = LoggerFactory.getLogger(AbstractWorkflowOperationHandler.class);
/** The ID of this operation handler */
protected String id = null;
/** The description of what this handler actually does */
protected String description = null;
/** Optional service registry */
protected ServiceRegistry serviceRegistry = null;
/** The JobBarrier polling interval */
private long jobBarrierPollingInterval = JobBarrier.DEFAULT_POLLING_INTERVAL;
/** Config for Tag Parsing operation */
protected enum Configuration {
none, one, many
};
public static final String TARGET_FLAVORS = "target-flavors";
public static final String TARGET_FLAVOR = "target-flavor";
public static final String TARGET_TAGS = "target-tags";
public static final String TARGET_TAG = "target-tag";
public static final String SOURCE_FLAVORS = "source-flavors";
public static final String SOURCE_FLAVOR = "source-flavor";
public static final String SOURCE_TAG = "source-tag";
public static final String SOURCE_TAGS = "source-tags";
/**
* Activates this component with its properties once all of the collaborating services have been set
*
* @param cc
* The component's context, containing the properties used for configuration
*/
@Activate
protected void activate(ComponentContext cc) {
this.id = (String) cc.getProperties().get(WorkflowService.WORKFLOW_OPERATION_PROPERTY);
this.description = (String) cc.getProperties().get(Constants.SERVICE_DESCRIPTION);
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.workflow.api.WorkflowOperationHandler#start(
* org.opencastproject.workflow.api.WorkflowInstance, JobContext)
*/
@Override
public abstract WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
throws WorkflowOperationException;
/**
* {@inheritDoc}
*
* @see org.opencastproject.workflow.api.WorkflowOperationHandler#skip(
* org.opencastproject.workflow.api.WorkflowInstance, JobContext)
*/
@Override
public WorkflowOperationResult skip(WorkflowInstance workflowInstance, JobContext context)
throws WorkflowOperationException {
return createResult(Action.SKIP);
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.workflow.api.WorkflowOperationHandler#destroy(
* org.opencastproject.workflow.api.WorkflowInstance, JobContext)
*/
@Override
public void destroy(WorkflowInstance workflowInstance, JobContext context) throws WorkflowOperationException {
}
/**
* Converts a comma separated string into a set of values. Useful for converting operation configuration strings into
* multi-valued sets.
*
* @param elements
* The comma space separated string
* @return the set of values
*/
protected List<String> asList(String elements) {
elements = StringUtils.trimToEmpty(elements);
List<String> list = new ArrayList<>();
for (String s : StringUtils.split(elements, ",")) {
if (StringUtils.trimToNull(s) != null) {
list.add(s.trim());
}
}
return list;
}
/**
* Generates a filename using the base name of a source element and the extension of a derived element.
*
* @param source
* the source media package element
* @param derived
* the derived media package element
* @return the filename
*/
protected String getFileNameFromElements(MediaPackageElement source, MediaPackageElement derived) {
String fileName = FilenameUtils.getBaseName(source.getURI().getPath());
String fileExtension = FilenameUtils.getExtension(derived.getURI().getPath());
return fileName + "." + fileExtension;
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.workflow.api.WorkflowOperationHandler#getId()
*/
@Override
public String getId() {
return id;
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.workflow.api.WorkflowOperationHandler#getDescription()
*/
@Override
public String getDescription() {
return description;
}
/**
* Creates a result for the execution of this workflow operation handler.
*
* @param action
* the action to take
* @return the result
*/
protected WorkflowOperationResult createResult(Action action) {
return createResult(null, null, action, 0);
}
/**
* Creates a result for the execution of this workflow operation handler.
*
* @param mediaPackage
* the modified mediapackage
* @param action
* the action to take
* @return the result
*/
protected WorkflowOperationResult createResult(MediaPackage mediaPackage, Action action) {
return createResult(mediaPackage, null, action, 0);
}
/**
* Creates a result for the execution of this workflow operation handler.
* <p>
* Since there is no way for the workflow service to determine the queuing time (e. g. waiting on services), it needs
* to be provided by the handler.
*
* @param mediaPackage
* the modified mediapackage
* @param action
* the action to take
* @param timeInQueue
* the amount of time this handle spent waiting for services
* @return the result
*/
protected WorkflowOperationResult createResult(MediaPackage mediaPackage, Action action, long timeInQueue) {
return createResult(mediaPackage, null, action, timeInQueue);
}
/**
* Creates a result for the execution of this workflow operation handler.
* <p>
* Since there is no way for the workflow service to determine the queuing time (e. g. waiting on services), it needs
* to be provided by the handler.
*
* @param mediaPackage
* the modified mediapackage
* @param properties
* the properties to add to the workflow instance
* @param action
* the action to take
* @param timeInQueue
* the amount of time this handle spent waiting for services
* @return the result
*/
protected WorkflowOperationResult createResult(MediaPackage mediaPackage, Map<String, String> properties,
Action action, long timeInQueue) {
return new WorkflowOperationResultImpl(mediaPackage, properties, action, timeInQueue);
}
/**
* Sets the service registry. This method is here as a convenience for developers that need the registry to do job
* waiting.
*
* @param serviceRegistry
* the service registry
*/
@Reference
public void setServiceRegistry(ServiceRegistry serviceRegistry) {
this.serviceRegistry = serviceRegistry;
}
/**
* Waits until all of the jobs have reached either one of these statuses:
* <ul>
* <li>{@link Job.Status#FINISHED}</li>
* <li>{@link Job.Status#FAILED}</li>
* <li>{@link Job.Status#DELETED}</li>
* </ul>
* After that, the method returns with the actual outcomes of the jobs.
*
* @param jobs
* the jobs
* @return the jobs and their outcomes
* @throws IllegalStateException
* if the service registry has not been set
* @throws IllegalArgumentException
* if the jobs collecion is either <code>null</code> or empty
*/
protected JobBarrier.Result waitForStatus(Job... jobs) throws IllegalStateException, IllegalArgumentException {
return waitForStatus(0, jobs);
}
/**
* Waits until all of the jobs have reached either one of these statuses:
* <ul>
* <li>{@link Job.Status#FINISHED}</li>
* <li>{@link Job.Status#FAILED}</li>
* <li>{@link Job.Status#DELETED}</li>
* </ul>
* After that, the method returns with the actual outcomes of the jobs.
*
* @param timeout
* the maximum amount of time in milliseconds to wait
* @param jobs
* the jobs
* @return the jobs and their outcomes
* @throws IllegalStateException
* if the service registry has not been set
* @throws IllegalArgumentException
* if the jobs collection is either <code>null</code> or empty
*/
protected JobBarrier.Result waitForStatus(long timeout, Job... jobs) throws IllegalStateException,
IllegalArgumentException {
if (serviceRegistry == null) {
throw new IllegalStateException("Can't wait for job status without providing a service registry first");
}
JobBarrier barrier = new JobBarrier(null, serviceRegistry, jobBarrierPollingInterval, jobs);
return barrier.waitForJobs(timeout);
}
/**
* Get a configuration option.
*
* @deprecated use {@link #getConfig(WorkflowInstance, String)} or
* {@link #getOptConfig(org.opencastproject.workflow.api.WorkflowInstance, String)}
*/
protected Optional<String> getCfg(WorkflowInstance wi, String key) {
return Optional.ofNullable(wi.getCurrentOperation().getConfiguration(key));
}
/**
* Get a mandatory configuration key. Values are returned trimmed.
*
* @throws WorkflowOperationException
* if the configuration key is either missing or empty
*/
protected String getConfig(WorkflowInstance wi, String key) throws WorkflowOperationException {
return getConfig(wi.getCurrentOperation(), key);
}
/**
* Get a configuration key. Values are returned trimmed.
*
* @param w
* WorkflowInstance with current operation
* @param key
* Configuration key to check for
* @param defaultValue
* Value to return if key does not exists
*/
protected String getConfig(WorkflowInstance w, String key, String defaultValue) {
Optional<String> cfgOpt = getOptConfig(w.getCurrentOperation(), key);
if (cfgOpt.isPresent()) {
return cfgOpt.get();
}
return defaultValue;
}
/**
* Get a mandatory configuration key. Values are returned trimmed.
*
* @throws WorkflowOperationException
* if the configuration key is either missing or empty
*/
protected String getConfig(WorkflowOperationInstance woi, String key) throws WorkflowOperationException {
Optional<String> cfgOpt = getOptConfig(woi, key);
if (cfgOpt.isPresent()) {
return cfgOpt.get();
}
throw new WorkflowOperationException(format("Configuration key '%s' is either missing or empty", key));
}
/**
* Get an optional configuration key. Values are returned trimmed.
*/
protected Optional<String> getOptConfig(WorkflowInstance wi, String key) {
return getOptConfig(wi.getCurrentOperation(), key);
}
/**
* Get an optional configuration key. Values are returned trimmed.
*/
protected Optional<String> getOptConfig(WorkflowOperationInstance woi, String key) {
return Optional.ofNullable(woi.getConfiguration(key))
.map(String::trim)
.filter(s -> !s.isEmpty());
}
/**
* Returns a ConfiguredTagsAndFlavors instance, which includes all specified source/target tags and flavors if they
* are valid. Lists can be empty, if no values were specified! This is to enable WOHs to individually check if a
* given tag/flavor was set. This also means that you should use Configuration.many as parameter, if a tag/flavor is
* optional.
* @param srcTags none, one or many
* @param srcFlavors none, one or many
* @param targetFlavors none, one or many
* @param targetTags none, one or many
* @return ConfiguredTagsAndFlavors object including lists for the configured tags/flavors
*/
protected ConfiguredTagsAndFlavors getTagsAndFlavors(WorkflowInstance workflow, Configuration srcTags,
Configuration srcFlavors, Configuration targetTags, Configuration targetFlavors)
throws WorkflowOperationException {
WorkflowOperationInstance operation = workflow.getCurrentOperation();
ConfiguredTagsAndFlavors tagsAndFlavors = new ConfiguredTagsAndFlavors();
MediaPackageElementFlavor flavor;
List<String> srcTagList = new ArrayList<>();
String srcTag;
switch(srcTags) {
case none:
break;
case one:
srcTag = StringUtils.trimToNull(operation.getConfiguration(SOURCE_TAG));
if (srcTag == null) {
throw new WorkflowOperationException("Configuration key '" + SOURCE_TAG + "' must be set");
}
srcTagList.add(srcTag);
break;
case many:
srcTagList = asList(StringUtils.trimToNull(operation.getConfiguration(SOURCE_TAGS)));
srcTag = StringUtils.trimToNull(operation.getConfiguration(SOURCE_TAG));
if (srcTagList.isEmpty() && srcTag != null) {
srcTagList.add(srcTag);
}
break;
default:
throw new WorkflowOperationException("Couldn't process srcTags configuration option!");
}
tagsAndFlavors.setSrcTags(srcTagList);
List<MediaPackageElementFlavor> srcFlavorList = new ArrayList<>();
String singleSourceFlavor;
switch(srcFlavors) {
case none:
break;
case one:
singleSourceFlavor = StringUtils.trimToNull(operation.getConfiguration(SOURCE_FLAVOR));
if (singleSourceFlavor == null) {
throw new WorkflowOperationException("Configuration key '" + SOURCE_FLAVOR + "' must be set");
}
try {
flavor = MediaPackageElementFlavor.parseFlavor(singleSourceFlavor);
} catch (IllegalArgumentException e) {
throw new WorkflowOperationException(singleSourceFlavor + " is not a valid flavor!");
}
srcFlavorList.add(flavor);
break;
case many:
List<String> srcFlavorString = asList(StringUtils.trimToNull(operation.getConfiguration(SOURCE_FLAVORS)));
singleSourceFlavor = StringUtils.trimToNull(operation.getConfiguration(SOURCE_FLAVOR));
if (srcFlavorString.isEmpty() && singleSourceFlavor != null) {
srcFlavorString.add(singleSourceFlavor);
}
for (String elem : srcFlavorString) {
try {
flavor = MediaPackageElementFlavor.parseFlavor(elem);
srcFlavorList.add(flavor);
} catch (IllegalArgumentException e) {
throw new WorkflowOperationException(elem + " is not a valid flavor!");
}
}
break;
default:
throw new WorkflowOperationException("Couldn't process srcFlavors configuration option!");
}
tagsAndFlavors.setSrcFlavors(srcFlavorList);
ConfiguredTagsAndFlavors.TargetTags targetTagMap = new ConfiguredTagsAndFlavors.TargetTags();
String targetTag;
switch(targetTags) {
case none:
break;
case one:
targetTag = StringUtils.trimToNull(operation.getConfiguration(TARGET_TAG));
if (targetTag == null) {
throw new WorkflowOperationException("Configuration key '" + TARGET_TAG + "' must be set");
}
targetTagMap = parseTargetTagsByType(List.of(targetTag));
break;
case many:
List<String> targetTagList = asList(StringUtils.trimToNull(operation.getConfiguration(TARGET_TAGS)));
targetTagMap = parseTargetTagsByType(targetTagList);
targetTag = StringUtils.trimToNull(operation.getConfiguration(TARGET_TAG));
if (targetTagList.isEmpty() && targetTag != null) {
targetTagMap = parseTargetTagsByType(List.of(targetTag));
}
break;
default:
throw new WorkflowOperationException("Couldn't process target-tag configuration option!");
}
tagsAndFlavors.setTargetTags(targetTagMap);
List<MediaPackageElementFlavor> targetFlavorList = new ArrayList<>();
String singleTargetFlavor;
switch(targetFlavors) {
case none:
break;
case one:
singleTargetFlavor = StringUtils.trimToNull(operation.getConfiguration(TARGET_FLAVOR));
if (singleTargetFlavor == null) {
throw new WorkflowOperationException("Configuration key '" + TARGET_FLAVOR + "' must be set");
}
try {
flavor = MediaPackageElementFlavor.parseFlavor(singleTargetFlavor);
} catch (IllegalArgumentException e) {
throw new WorkflowOperationException(singleTargetFlavor + " is not a valid flavor!");
}
targetFlavorList.add(flavor);
break;
case many:
List<String> targetFlavorString = asList(StringUtils.trimToNull(operation.getConfiguration(TARGET_FLAVORS)));
singleTargetFlavor = StringUtils.trimToNull(operation.getConfiguration(TARGET_FLAVOR));
if (targetFlavorString.isEmpty() && singleTargetFlavor != null) {
targetFlavorString.add(singleTargetFlavor);
}
for (String elem : targetFlavorString) {
try {
flavor = MediaPackageElementFlavor.parseFlavor(elem);
} catch (IllegalArgumentException e) {
throw new WorkflowOperationException(elem + " is not a valid flavor!");
}
targetFlavorList.add(flavor);
}
break;
default:
throw new WorkflowOperationException("Couldn't process targetFlavors configuration option!");
}
tagsAndFlavors.setTargetFlavors(targetFlavorList);
return tagsAndFlavors;
}
private ConfiguredTagsAndFlavors.TargetTags parseTargetTagsByType(List<String> tags) {
final String plus = "+";
final String minus = "-";
List<String> overrideTags = new ArrayList();
List<String> addTags = new ArrayList();
List<String> removeTags = new ArrayList();
for (String targetTag : tags) {
if (!StringUtils.startsWithAny(targetTag, plus, minus)) {
if (addTags.size() > 0
|| removeTags.size() > 0) {
logger.warn("You may not mix override tags and tag changes. "
+ "The list of override tags so far is {}. "
+ "The tag {} is not prefixed with '{}' or '{}'.", overrideTags, targetTag, plus, minus);
}
overrideTags.add(targetTag);
} else if (StringUtils.startsWith(targetTag, plus)) {
addTags.add(StringUtils.substring(targetTag, 1));
} else if (StringUtils.startsWith(targetTag, minus)) {
removeTags.add(StringUtils.substring(targetTag, 1));
}
}
return new ConfiguredTagsAndFlavors.TargetTags(overrideTags, addTags, removeTags);
}
/**
* Helper function that applies target tags to the given element, based on the type(s) of the tag(s)
* @param targetTags The target tags to apply to the element
* @param element The element the target tags are applied to
* @return The element with the applied target tags
*/
protected <T extends MediaPackageElement> T applyTargetTagsToElement(
ConfiguredTagsAndFlavors.TargetTags targetTags,
T element
) {
// set tags on target element
List<String> overrideTags = targetTags.getOverrideTags();
List<String> addTags = targetTags.getAddTags();
List<String> removeTags = targetTags.getRemoveTags();
if (overrideTags.size() > 0) {
element.clearTags();
for (String tag : overrideTags) {
element.addTag(tag);
}
} else {
for (String tag : removeTags) {
element.removeTag(tag);
}
for (String tag : addTags) {
element.addTag(tag);
}
}
return element;
}
/**
* Set the @link org.opencastproject.job.api.JobBarrier polling interval.
* <p>
* While waiting for other jobs to finish, the barrier will poll the status of these jobs until they are finished. To
* reduce load on the system, the polling is done only every x milliseconds. This interval defines the sleep time
* between these polls.
* <p>
* If most cases you want to leave this at its default value. It will make sense, though, to adjust this time if you
* know that your job will be exceptionally short. An example of this might be the unit tests where other jobs are
* usually mocked. But this setting is not limited to tests and may be a sensible options for other jobs as well.
*
* @param interval the time in miliseconds between two polling operations
*
* @see org.opencastproject.job.api.JobBarrier#DEFAULT_POLLING_INTERVAL
*/
public void setJobBarrierPollingInterval(long interval) {
this.jobBarrierPollingInterval = interval;
}
/**
* {@inheritDoc}
*
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
return id != null ? id.hashCode() : super.hashCode();
}
/**
* {@inheritDoc}
*
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj) {
if (obj instanceof WorkflowOperationHandler) {
if (id != null) {
return id.equals(((WorkflowOperationHandler) obj).getId());
} else {
return this == obj;
}
}
return false;
}
/**
* {@inheritDoc}
*
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return getId();
}
}