ConfigurablePublishWorkflowOperationHandler.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.workflow.handler.distribution;
import org.opencastproject.distribution.api.DistributionException;
import org.opencastproject.distribution.api.DownloadDistributionService;
import org.opencastproject.distribution.api.StreamingDistributionService;
import org.opencastproject.job.api.Job;
import org.opencastproject.job.api.JobContext;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElement;
import org.opencastproject.mediapackage.MediaPackageElementFlavor;
import org.opencastproject.mediapackage.MediaPackageElementParser;
import org.opencastproject.mediapackage.MediaPackageException;
import org.opencastproject.mediapackage.Publication;
import org.opencastproject.mediapackage.PublicationImpl;
import org.opencastproject.mediapackage.selector.SimpleElementSelector;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.util.MimeType;
import org.opencastproject.util.MimeTypes;
import org.opencastproject.util.RequireUtil;
import org.opencastproject.util.doc.DocUtil;
import org.opencastproject.workflow.api.WorkflowInstance;
import org.opencastproject.workflow.api.WorkflowOperationException;
import org.opencastproject.workflow.api.WorkflowOperationHandler;
import org.opencastproject.workflow.api.WorkflowOperationInstance;
import org.opencastproject.workflow.api.WorkflowOperationResult;
import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
/**
* WOH that distributes selected elements to an internal distribution channel and adds reflective publication elements
* to the media package.
*/
@Component(
immediate = true,
service = WorkflowOperationHandler.class,
property = {
"service.description=Configurable Publication Workflow Handler",
"workflow.operation=publish-configure"
}
)
public class ConfigurablePublishWorkflowOperationHandler extends ConfigurableWorkflowOperationHandlerBase {
/** The logging facility */
private static final Logger logger = LoggerFactory.getLogger(ConfigurablePublishWorkflowOperationHandler.class);
/** The template key for adding the mediapackage / event id to the publication path. */
protected static final String EVENT_ID_TEMPLATE_KEY = "event_id";
/** The template key for adding the player location path to the publication path. */
protected static final String PLAYER_PATH_TEMPLATE_KEY = "player_path";
/** The template key for adding the publication id to the publication path. */
protected static final String PUBLICATION_ID_TEMPLATE_KEY = "publication_id";
/** The template key for adding the series id to the publication path. */
protected static final String SERIES_ID_TEMPLATE_KEY = "series_id";
/** The configuration property value for the player location. */
protected static final String PLAYER_PROPERTY = "player";
/** The template key name prefix for organization keys */
protected static final String ORG_TEMPLATE_KEY_PREFIX = "org_";
// service references
private DownloadDistributionService downloadDistributionService;
private StreamingDistributionService streamingDistributionService;
private SecurityService securityService;
/** Workflow configuration options */
static final String DOWNLOAD_SOURCE_FLAVORS = "download-source-flavors";
static final String DOWNLOAD_SOURCE_TAGS = "download-source-tags";
static final String STREAMING_SOURCE_TAGS = "streaming-source-tags";
static final String STREAMING_SOURCE_FLAVORS = "streaming-source-flavors";
static final String CHANNEL_ID_KEY = "channel-id";
static final String MIME_TYPE = "mimetype";
static final String WITH_PUBLISHED_ELEMENTS = "with-published-elements";
static final String CHECK_AVAILABILITY = "check-availability";
static final String STRATEGY = "strategy";
static final String MODE = "mode";
/** Known values for mode **/
static final String MODE_SINGLE = "single";
static final String MODE_MIXED = "mixed";
static final String MODE_BULK = "bulk";
static final String[] KNOWN_MODES = { MODE_SINGLE, MODE_MIXED, MODE_BULK };
static final String DEFAULT_MODE = MODE_BULK;
/** The workflow configuration key for defining the url pattern. */
static final String URL_PATTERN = "url-pattern";
static final String RETRACT_STREAMING = "retract-streaming";
static final boolean RETRACT_STREAMING_DEFAULT = false;
/** OSGi DI */
@Reference(target = "(distribution.channel=download)")
void setDownloadDistributionService(DownloadDistributionService distributionService) {
this.downloadDistributionService = distributionService;
}
@Reference(target = "(distribution.channel=streaming)")
void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
this.streamingDistributionService = streamingDistributionService;
}
/** OSGi DI */
@Reference
protected void setSecurityService(SecurityService securityService) {
this.securityService = securityService;
}
@Override
protected DownloadDistributionService getDownloadDistributionService() {
assert (downloadDistributionService != null);
return downloadDistributionService;
}
@Override
protected StreamingDistributionService getStreamingDistributionService() {
assert (streamingDistributionService != null);
return streamingDistributionService;
}
/**
* Replace possible variables in the url-pattern configuration for this workflow operation handler.
*
* @param urlPattern
* The operation's template for replacing the variables.
* @param mp
* The {@link MediaPackage} used to get the event / mediapackage id.
* @param pubUUID
* The UUID for the published element.
* @return The URI of the published element with the variables replaced.
* @throws WorkflowOperationException
* Thrown if the URI is malformed after replacing the variables.
*/
public URI populateUrlWithVariables(String urlPattern, MediaPackage mp, String pubUUID)
throws WorkflowOperationException {
Map<String, Object> values = new HashMap<>();
values.put(EVENT_ID_TEMPLATE_KEY, mp.getIdentifier().toString());
values.put(PUBLICATION_ID_TEMPLATE_KEY, pubUUID);
String playerPath = securityService.getOrganization().getProperties().get(PLAYER_PROPERTY);
values.put(PLAYER_PATH_TEMPLATE_KEY, playerPath);
values.put(SERIES_ID_TEMPLATE_KEY, StringUtils.trimToEmpty(mp.getSeries()));
Map<String, String> orgProperties = securityService.getOrganization().getProperties();
orgProperties.put("id", securityService.getOrganization().getId());
orgProperties.put("name", securityService.getOrganization().getName());
orgProperties.put("admin_role", securityService.getOrganization().getAdminRole());
orgProperties.put("anonymous_role", securityService.getOrganization().getAnonymousRole());
for (Map.Entry<String, String> orgProperty : orgProperties.entrySet()) {
values.put(ORG_TEMPLATE_KEY_PREFIX + orgProperty.getKey().replace('.', '_').toLowerCase(),
orgProperty.getValue());
}
String uriWithVariables = DocUtil.processTextTemplate("Replacing Variables in Publish URL", urlPattern, values);
URI publicationURI;
try {
publicationURI = new URI(uriWithVariables);
} catch (URISyntaxException e) {
throw new WorkflowOperationException(String.format(
"Unable to create URI from template '%s', replacement was: '%s'", urlPattern, uriWithVariables), e);
}
return publicationURI;
}
@Override
public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
throws WorkflowOperationException {
RequireUtil.notNull(workflowInstance, "workflowInstance");
final MediaPackage mp = workflowInstance.getMediaPackage();
final WorkflowOperationInstance op = workflowInstance.getCurrentOperation();
final String channelId = StringUtils.trimToEmpty(op.getConfiguration(CHANNEL_ID_KEY));
if ("".equals(channelId)) {
throw new WorkflowOperationException("Unable to publish this mediapackage as the configuration key "
+ CHANNEL_ID_KEY + " is missing. Unable to determine where to publish these elements.");
}
final String urlPattern = StringUtils.trimToEmpty(op.getConfiguration(URL_PATTERN));
MimeType mimetype = null;
String mimetypeString = StringUtils.trimToEmpty(op.getConfiguration(MIME_TYPE));
if (!"".equals(mimetypeString)) {
try {
mimetype = MimeTypes.parseMimeType(mimetypeString);
} catch (IllegalArgumentException e) {
throw new WorkflowOperationException("Unable to parse the provided configuration for " + MIME_TYPE, e);
}
}
final boolean withPublishedElements = BooleanUtils
.toBoolean(StringUtils.trimToEmpty(op.getConfiguration(WITH_PUBLISHED_ELEMENTS)));
boolean checkAvailability = BooleanUtils
.toBoolean(StringUtils.trimToEmpty(op.getConfiguration(CHECK_AVAILABILITY)));
boolean retractStreaming = RETRACT_STREAMING_DEFAULT;
String retractStreamingString = workflowInstance.getConfiguration(RETRACT_STREAMING);
if (retractStreamingString != null) {
retractStreaming = BooleanUtils.toBoolean(StringUtils.trimToEmpty(retractStreamingString));
}
if (getPublications(mp, channelId).size() > 0) {
final String rePublishStrategy = StringUtils.trimToEmpty(op.getConfiguration(STRATEGY));
switch (rePublishStrategy) {
case ("fail"):
// fail is a dummy function for further distribution strategies
fail(mp);
break;
case ("merge"):
// nothing to do here. other publication strategies can be added to this list later on
break;
default:
retract(mp, channelId, retractStreaming);
}
}
String mode = StringUtils.trimToEmpty(op.getConfiguration(MODE));
if ("".equals(mode)) {
mode = DEFAULT_MODE;
} else if (!ArrayUtils.contains(KNOWN_MODES, mode)) {
logger.error("Unknown value for configuration key mode: '{}'", mode);
throw new IllegalArgumentException("Unknown value for configuration key mode");
}
final String[] downloadSourceFlavors
= StringUtils.split(StringUtils.trimToEmpty(op.getConfiguration(DOWNLOAD_SOURCE_FLAVORS)), ",");
final String[] downloadSourceTags
= StringUtils.split(StringUtils.trimToEmpty(op.getConfiguration(DOWNLOAD_SOURCE_TAGS)), ",");
final String[] streamingSourceFlavors
= StringUtils.split(StringUtils.trimToEmpty(op.getConfiguration(STREAMING_SOURCE_FLAVORS)), ",");
final String[] streamingSourceTags
= StringUtils.split(StringUtils.trimToEmpty(op.getConfiguration(STREAMING_SOURCE_TAGS)), ",");
String publicationUUID = UUID.randomUUID().toString();
Publication publication = PublicationImpl.publication(publicationUUID, channelId, null, null);
// Configure the element selectors
final SimpleElementSelector downloadSelector = new SimpleElementSelector();
for (String flavor : downloadSourceFlavors) {
downloadSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
}
for (String tag : downloadSourceTags) {
downloadSelector.addTag(tag);
}
final SimpleElementSelector streamingSelector = new SimpleElementSelector();
for (String flavor : streamingSourceFlavors) {
streamingSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
}
for (String tag : streamingSourceTags) {
streamingSelector.addTag(tag);
}
boolean streamingElementsDistributed = false;
boolean downloadElementsDistributed = false;
if (streamingDistributionService != null && streamingDistributionService.publishToStreaming()
&& (streamingSourceFlavors.length > 0 || streamingSourceTags.length > 0)) {
streamingElementsDistributed = distributeElements(streamingSelector, mp, publication, channelId, mode,
withPublishedElements, checkAvailability, true);
}
if (downloadSourceFlavors.length > 0 || downloadSourceTags.length > 0) {
downloadElementsDistributed = distributeElements(downloadSelector, mp, publication, channelId, mode,
withPublishedElements, checkAvailability, false);
}
if (!downloadElementsDistributed && !streamingElementsDistributed
&& (downloadSourceFlavors.length > 0 || downloadSourceTags.length > 0
|| streamingSourceFlavors.length > 0 || streamingSourceTags.length > 0)) {
// skip publication if no elements was distributed but should be
return createResult(mp, Action.SKIP);
}
if (!"".equals(urlPattern)) {
publication.setURI(populateUrlWithVariables(urlPattern, mp, publicationUUID));
}
if (mimetype != null) {
publication.setMimeType(mimetype);
}
mp.add(publication);
return createResult(mp, Action.CONTINUE);
}
private boolean distributeElements(SimpleElementSelector selector, MediaPackage mp, Publication publication,
String channelId, String mode, boolean withPublishedElements, boolean checkAvailability, boolean streaming)
throws WorkflowOperationException {
String target = (streaming ? "streaming" : "download");
if (!withPublishedElements) {
Set<MediaPackageElement> elements = distribute(selector.select(mp, false), mp, channelId, mode,
checkAvailability, streaming);
if (elements.size() > 0) {
for (MediaPackageElement element : elements) {
// Make sure the mediapackage is prompted to create a new identifier for this element
element.setIdentifier(null);
PublicationImpl.addElementToPublication(publication, element);
}
} else {
logger.info("No element found for distribution to " + target + " in media package '{}'", mp);
return false;
}
} else {
List<MediaPackageElement> publishedElements = new ArrayList<>();
for (Publication alreadyPublished : mp.getPublications()) {
publishedElements.addAll(Arrays.asList(alreadyPublished.getAttachments()));
publishedElements.addAll(Arrays.asList(alreadyPublished.getCatalogs()));
publishedElements.addAll(Arrays.asList(alreadyPublished.getTracks()));
}
Collection<MediaPackageElement> elements = selector.select(publishedElements, false);
if (elements.size() > 0) {
for (MediaPackageElement element : elements) {
PublicationImpl.addElementToPublication(publication, element);
}
} else {
logger.info("No elements found for publication to " + target + " in media package '{}'", mp);
return false;
}
}
return true;
}
private Set<MediaPackageElement> distribute(
Collection<MediaPackageElement> elements,
MediaPackage mediapackage,
String channelId,
String mode,
boolean checkAvailability,
boolean streaming
) throws WorkflowOperationException {
Set<MediaPackageElement> result = new HashSet<>();
Set<String> bulkElementIds = new HashSet<>();
Set<String> singleElementIds = new HashSet<>();
for (MediaPackageElement element : elements) {
if (MODE_BULK.equals(mode)
|| (MODE_MIXED.equals(mode) && (element.getElementType() != MediaPackageElement.Type.Track))) {
bulkElementIds.add(element.getIdentifier());
} else {
singleElementIds.add(element.getIdentifier());
}
}
Set<Job> jobs = new HashSet<>();
if (bulkElementIds.size() > 0) {
logger.info("Start bulk publishing of {} elements of media package '{}' to publication channel '{}'",
bulkElementIds.size(), mediapackage, channelId);
try {
Job job;
if (streaming) {
job = streamingDistributionService.distribute(channelId, mediapackage, bulkElementIds);
} else {
job = downloadDistributionService.distribute(channelId, mediapackage, bulkElementIds, checkAvailability);
}
jobs.add(job);
} catch (DistributionException | MediaPackageException e) {
logger.error("Creating the distribution job for {} elements of media package '{}' failed",
bulkElementIds.size(), mediapackage, e);
throw new WorkflowOperationException(e);
}
}
if (singleElementIds.size() > 0) {
logger.info("Start single publishing of {} elements of media package '{}' to publication channel '{}'",
singleElementIds.size(), mediapackage, channelId);
for (String elementId : singleElementIds) {
try {
Job job;
if (streaming) {
job = streamingDistributionService.distribute(channelId, mediapackage, elementId);
} else {
job = downloadDistributionService.distribute(channelId, mediapackage, elementId, checkAvailability);
}
jobs.add(job);
} catch (DistributionException | MediaPackageException e) {
logger.error("Creating the distribution job for element '{}' of media package '{}' failed", elementId,
mediapackage, e);
throw new WorkflowOperationException(e);
}
}
}
if (jobs.size() > 0) {
if (!waitForStatus(jobs.toArray(new Job[jobs.size()])).isSuccess()) {
throw new WorkflowOperationException("At least one of the distribution jobs did not complete successfully");
}
for (Job job : jobs) {
try {
List<? extends MediaPackageElement> elems = MediaPackageElementParser.getArrayFromXml(job.getPayload());
result.addAll(elems);
} catch (MediaPackageException e) {
logger.error("Job '{}' returned payload ({}) that could not be parsed to media package elements", job,
job.getPayload(), e);
throw new WorkflowOperationException(e);
}
}
logger.info("Published {} elements of media package {} to publication channel {}",
bulkElementIds.size() + singleElementIds.size(), mediapackage, channelId);
}
return result;
}
/**
* Dummy function for further publication strategies
*
* @param mp
* @throws WorkflowOperationException
*/
private void fail(MediaPackage mp) throws WorkflowOperationException {
logger.error("There is already a Published Media, fail Stragy for Mediapackage {}", mp.getIdentifier());
throw new WorkflowOperationException("There is already a Published Media, fail Stragy for Mediapackage ");
}
@Reference
@Override
public void setServiceRegistry(ServiceRegistry serviceRegistry) {
super.setServiceRegistry(serviceRegistry);
}
}