MultiEncodeWorkflowOperationHandler.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.workflow.handler.composer;
import org.opencastproject.composer.api.ComposerService;
import org.opencastproject.composer.api.EncoderException;
import org.opencastproject.composer.api.EncodingProfile;
import org.opencastproject.job.api.Job;
import org.opencastproject.job.api.JobContext;
import org.opencastproject.mediapackage.AdaptivePlaylist;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElementFlavor;
import org.opencastproject.mediapackage.MediaPackageElementParser;
import org.opencastproject.mediapackage.MediaPackageException;
import org.opencastproject.mediapackage.Track;
import org.opencastproject.mediapackage.selector.AbstractMediaPackageElementSelector;
import org.opencastproject.mediapackage.selector.TrackSelector;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
import org.opencastproject.workflow.api.WorkflowInstance;
import org.opencastproject.workflow.api.WorkflowOperationException;
import org.opencastproject.workflow.api.WorkflowOperationHandler;
import org.opencastproject.workflow.api.WorkflowOperationInstance;
import org.opencastproject.workflow.api.WorkflowOperationResult;
import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
import org.opencastproject.workspace.api.Workspace;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* The workflow definition for handling multiple concurrent outputs in one ffmpeg operation. This allows encoding and
* tagging to be done in one operation
*/
@Component(
immediate = true,
service = WorkflowOperationHandler.class,
property = {
"service.description=MultiEncode Workflow Operation Handler",
"workflow.operation=multiencode"
}
)
public class MultiEncodeWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
/** The logging facility */
private static final Logger logger = LoggerFactory.getLogger(MultiEncodeWorkflowOperationHandler.class);
/** seperator for independent clauses */
static final String SEPARATOR = ";";
/** The composer service */
private ComposerService composerService = null;
/** The local workspace */
private Workspace workspace = null;
@Activate
public void activate(ComponentContext cc) {
super.activate(cc);
}
/**
* Callback for the OSGi declarative services configuration.
*
* @param composerService
* the local composer service
*/
@Reference
protected void setComposerService(ComposerService composerService) {
this.composerService = composerService;
}
/**
* Callback for declarative services configuration that will introduce us to the local workspace service.
* Implementation assumes that the reference is configured as being static.
*
* @param workspace
* an instance of the workspace
*/
@Reference
public void setWorkspace(Workspace workspace) {
this.workspace = workspace;
}
private Predicate<EncodingProfile> isManifestEP = p -> p.getOutputType() == EncodingProfile.MediaType.Manifest;
/**
* {@inheritDoc}
*
* @see org.opencastproject.workflow.api.WorkflowOperationHandler#start(org.opencastproject.workflow.api.WorkflowInstance,
* JobContext)
*/
@Override
public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
throws WorkflowOperationException {
logger.debug("Running Multiencode workflow operation on workflow {}", workflowInstance.getId());
try {
return multiencode(workflowInstance.getMediaPackage(), workflowInstance.getCurrentOperation());
} catch (Exception e) {
throw new WorkflowOperationException(e);
}
}
protected class ElementProfileTagFlavor {
private AbstractMediaPackageElementSelector<Track> elementSelector = new TrackSelector();
private String targetFlavor = null;
private String targetTags = null;
private List<String> encodingProfiles = new ArrayList<>(); // redundant storage
private List<EncodingProfile> encodingProfileList = new ArrayList<>();
ElementProfileTagFlavor(String profiles) {
List<String> profilelist = asList(profiles);
for (String profile : profilelist) {
EncodingProfile encodingprofile = composerService.getProfile(profile);
if (encodingprofile != null) {
encodingProfiles.add(encodingprofile.getIdentifier());
encodingProfileList.add(encodingprofile);
} else {
throw new IllegalArgumentException("Encoding profile " + profile + " not found.");
}
}
}
public AbstractMediaPackageElementSelector<Track> getSelector() {
return this.elementSelector;
}
public List<String> getProfiles() {
return this.encodingProfiles;
}
public List<EncodingProfile> getEncodingProfiles() {
return this.encodingProfileList;
}
void addSourceFlavor(String flavor) {
this.elementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
}
void addSourceTag(String tag) {
this.elementSelector.addTag(tag);
}
void setTargetTags(String tags) {
this.targetTags = tags;
}
void setTargetFlavor(String flavor) {
this.targetFlavor = flavor;
}
String getTargetFlavor() {
return this.targetFlavor;
}
String getTargetTags() {
return this.targetTags;
}
}
/*
* Figures out the logic of all the source tags, flavors and profiles and sorts out the source tracks and
* corresponding encoding profiles.
*
* Source Tracks are selected by (Flavor AND Tag) if they are both provided
*
* There can be multiple sources and flavors to create more than one source tracks. In the workflow, A semi-colon ";"
* is used to separate the independent operations.
*
* The independent operations can be either all share the same set of properties or all have different sets of
* properties. For example, There are two sets of source flavors: * "presenter/* ; presentation/*", one source tag,
* eg: "preview", and two sets of encoding profiles, eg: "mp4,flv ; mp4,hdtv" then there are two concurrent
* operations: the first one is all "presenter" tracks tagged "preview" will be encoded with "mp4" and "flv". The
* second one is all "presentation" tracks tagged "preview" encoded with "mp4" and "hdtv"
*
*/
private List<ElementProfileTagFlavor> getSrcSelector(String[] sourceFlavors, String[] sourceTags,
String[] targetFlavors, String[] targetTags, String[] profiles) throws WorkflowOperationException {
int n = 0;
List<ElementProfileTagFlavor> elementSelectors = new ArrayList<>();
if (sourceTags == null && sourceFlavors == null)
throw new WorkflowOperationException("No source tags or Flavor");
if (profiles == null)
throw new WorkflowOperationException("Missing profiles");
if (sourceTags != null) { // If source tags are used to select tracks
// If use source and target tags, there should be the same number of them or all map into one target
if (targetTags != null && (targetTags.length != 1 && sourceTags.length != targetTags.length))
throw new WorkflowOperationException("number of source tags " + sourceTags.length
+ " does not match number of target tags " + targetTags.length + " (must be the same or one target)");
// There should be the same number of source tags or profile groups or all use same group of profiles
if (profiles.length != 1 && sourceTags.length != profiles.length) {
throw new WorkflowOperationException(
"number of source tags segments " + sourceTags.length + " does not match number of profiles segments "
+ profiles.length + " (must be the same or one profile)");
}
// If use source tags and source flavors, there should be the same number of them or one
if (sourceFlavors != null && (sourceTags.length != 1 && sourceFlavors.length != 1)
&& sourceFlavors.length != sourceTags.length) {
throw new WorkflowOperationException("number of source tags segments " + sourceTags.length
+ " does not match number of source Flavor segments " + sourceFlavors.length
+ " (must be the same or one)");
}
n = sourceTags.length; // at least this many tracks
}
if (sourceFlavors != null) { // If flavors are used to select tracks
// If use source and target flavors, there should be the same number of them or all map into one target
if (targetFlavors != null && (targetFlavors.length != 1 && sourceFlavors.length != targetFlavors.length)) {
throw new WorkflowOperationException(
"number of source flavors " + sourceFlavors.length + " segment does not match number of target flavors"
+ targetFlavors.length + " (must be the same or one target flavor)");
}
// If use target tags, there should be the same number of source flavors and target tags or all map into one
// target tag
if (targetTags != null && targetTags.length != 1 && sourceFlavors.length != targetTags.length) {
throw new WorkflowOperationException(
"number of source flavors " + sourceFlavors.length + " segment does not match number of target Tags"
+ targetTags.length + " (must be the same or one target)");
}
// Number of profile groups should match number of source flavors
if ((profiles.length != 1 && sourceFlavors.length != profiles.length)) {
throw new WorkflowOperationException("number of source flavors segments " + sourceFlavors.length
+ " does not match number of profiles segments " + profiles.length
+ " (must be the same or one profile)");
}
if (sourceFlavors.length > n)
n = sourceFlavors.length; // at least this many tracks
}
int numProfiles = 0;
// One for each source flavor
for (int i = 0; i < n; i++) {
elementSelectors.add(new ElementProfileTagFlavor(profiles[numProfiles]));
if (profiles.length > 1)
numProfiles++; // All source use the same set of profiles or its own
}
// If uses tags to select, but sets target flavor, they must match
if (sourceTags != null && sourceFlavors != null) {
if (sourceTags.length != sourceFlavors.length && sourceFlavors.length != 1 && sourceTags.length != 1) {
throw new WorkflowOperationException(
"number of source flavors " + sourceTags.length + " does not match number of source tags "
+ sourceFlavors.length + " (must be the same or one set of tags or flavors)");
}
}
populateFlavorsAndTags(elementSelectors, sourceFlavors, targetFlavors, sourceTags, targetTags);
return elementSelectors;
}
private List<ElementProfileTagFlavor> populateFlavorsAndTags(List<ElementProfileTagFlavor> elementSelectors,
String[] sourceFlavors, String[] targetFlavors, String[] sourceTags, String[] targetTags)
throws WorkflowOperationException {
int sf = 0;
int tf = 0;
int st = 0;
int tt = 0;
for (ElementProfileTagFlavor ep : elementSelectors) {
try {
if (sourceTags != null) {
for (String tag : asList(sourceTags[st])) {
ep.addSourceTag(tag);
}
if (sourceTags.length != 1)
st++;
}
if (targetTags != null) {
ep.setTargetTags(targetTags[tt]);
if (targetTags.length != 1)
tt++;
}
if (sourceFlavors != null) {
for (String flavor : asList(sourceFlavors[sf])) {
ep.addSourceFlavor(flavor);
}
if (sourceFlavors.length != 1)
sf++;
}
if (targetFlavors != null) {
for (String flavor : asList(targetFlavors[tf])) {
ep.setTargetFlavor(flavor);
}
if (targetFlavors.length != 1)
tf++;
}
} catch (IllegalArgumentException e) {
throw new WorkflowOperationException("Set Tags or Flavor " + e.getMessage());
}
}
return elementSelectors;
}
private String[] getConfigAsArray(WorkflowOperationInstance operation, String name) {
String sourceOption = StringUtils.trimToNull(operation.getConfiguration(name));
return StringUtils.split(sourceOption, SEPARATOR);
}
private List<Track> getManifest(Collection<Track> tracks) {
return tracks.stream().filter(AdaptivePlaylist.isHLSTrackPred).collect(Collectors.toList());
}
/*
* Encode multiple tracks in a mediaPackage concurrently with different encoding profiles for each track. The encoding
* profiles are specified by names in a list and are the names used to tag each corresponding output. Each source
* track will start one operation on one worker. concurrency is achieved by running on different workers
*
* @param src The source media package
*
* @param operation the current workflow operation
*
* @return the operation result containing the updated media package
*
* @throws EncoderException if encoding fails
*
* @throws WorkflowOperationException if errors occur during processing
*
* @throws IOException if the workspace operations fail
*
* @throws NotFoundException if the workspace doesn't contain the requested file
*/
private WorkflowOperationResult multiencode(MediaPackage src, WorkflowOperationInstance operation)
throws EncoderException, IOException, NotFoundException, MediaPackageException, WorkflowOperationException {
MediaPackage mediaPackage = (MediaPackage) src.clone();
// Check which tags have been configured
String[] sourceTags = getConfigAsArray(operation, "source-tags");
String[] sourceFlavors = getConfigAsArray(operation, "source-flavors");
String[] targetTags = getConfigAsArray(operation, "target-tags");
String[] targetFlavors = getConfigAsArray(operation, "target-flavors");
String tagWithProfileConfig = StringUtils.trimToNull(operation.getConfiguration("tag-with-profile"));
boolean tagWithProfile = BooleanUtils.toBoolean(tagWithProfileConfig);
// Make sure either one of tags or flavors are provided
if (sourceFlavors == null && sourceTags == null) {
logger.info("No source tags or flavors have been specified, not matching anything");
return createResult(mediaPackage, Action.CONTINUE);
}
String[] profiles = getConfigAsArray(operation, "encoding-profiles");
if (profiles == null)
throw new WorkflowOperationException("Missing encoding profiles");
// Sort out the combinatorics of all the tags and flavors
List<ElementProfileTagFlavor> selectors = getSrcSelector(sourceFlavors, sourceTags, targetFlavors, targetTags,
profiles);
long totalTimeInQueue = 0;
Map<Job, JobInformation> encodingJobs = new HashMap<>();
// Find the encoding profiles - should only be one per flavor or tag
for (ElementProfileTagFlavor eptf : selectors) {
// Look for elements matching the tag and flavor
Collection<Track> elements = eptf.elementSelector.select(mediaPackage, true);
for (Track sourceTrack : elements) {
logger.info("Encoding track {} using encoding profile '{}'", sourceTrack, eptf.getProfiles().get(0).toString());
// Start encoding and wait for the result
encodingJobs.put(composerService.multiEncode(sourceTrack, eptf.getProfiles()),
new JobInformation(sourceTrack, eptf, tagWithProfile));
}
}
if (encodingJobs.isEmpty()) {
logger.info("No matching tracks found");
return createResult(mediaPackage, Action.CONTINUE);
}
// Wait for the jobs to return
if (!waitForStatus(encodingJobs.keySet().toArray(new Job[encodingJobs.size()])).isSuccess()) {
throw new WorkflowOperationException("One of the encoding jobs did not complete successfully");
}
// Process the result
for (Map.Entry<Job, JobInformation> entry : encodingJobs.entrySet()) {
Job job = entry.getKey();
Track sourceTrack = entry.getValue().getTrack(); // source
ElementProfileTagFlavor info = entry.getValue().getInfo(); // tags and flavors
List<EncodingProfile> eplist = entry.getValue().getProfileList();
// add this receipt's queue time to the total
totalTimeInQueue += job.getQueueTime();
// it is allowed for compose jobs to return an empty payload. See the EncodeEngine interface
if (job.getPayload().length() > 0) {
@SuppressWarnings("unchecked")
List<Track> composedTracks = (List<Track>) MediaPackageElementParser.getArrayFromXml(job.getPayload());
// HLS Manifest profile has precedence and overrides individual encoding profiles
boolean isHLS = eplist.stream().anyMatch(isManifestEP);
if (isHLS) { // check that manifests and segments counts are correct
decipherHLSPlaylistResults(sourceTrack, entry.getValue(), mediaPackage, composedTracks);
} else if (composedTracks.size() != info.getProfiles().size()) {
logger.info("Encoded {} tracks, with {} profiles", composedTracks.size(), info.getProfiles().size());
throw new WorkflowOperationException("Number of output tracks does not match number of encoding profiles");
}
for (Track composedTrack : composedTracks) {
if (info.getTargetFlavor() != null) { // Has Flavors
// set it to the matching flavor in the order listed
composedTrack.setFlavor(newFlavor(sourceTrack, info.getTargetFlavor()));
logger.debug("Composed track has flavor '{}'", composedTrack.getFlavor());
}
if (info.getTargetTags() != null) { // Has Tags
for (String tag : asList(info.getTargetTags())) {
logger.trace("Tagging composed track with '{}'", tag);
composedTrack.addTag(tag);
}
}
// Tag each output with encoding profile name if configured
if (entry.getValue().getTagWithProfile()) {
tagByProfile(composedTrack, eplist);
}
String fileName;
if (!isHLS || composedTrack.isMaster()) {
// name after source track if user facing
fileName = getFileNameFromElements(sourceTrack, composedTrack);
} else { // HLS-VOD
// Should all the files be renamed to the same as source
// which defeats the purpose of the suffix in encoding profiles
fileName = FilenameUtils.getName(composedTrack.getURI().getPath());
}
// store new tracks to mediaPackage
composedTrack.setURI(workspace.moveTo(composedTrack.getURI(), mediaPackage.getIdentifier().toString(),
composedTrack.getIdentifier(), fileName));
mediaPackage.addDerived(composedTrack, sourceTrack);
}
} else {
logger.warn("No output from MultiEncode operation");
}
}
WorkflowOperationResult result = createResult(mediaPackage, Action.CONTINUE, totalTimeInQueue);
logger.debug("MultiEncode operation completed");
return result;
}
/**
* Find the matching encoding profile for this track and tag by name
*
* @param track
* @param profiles
* - profiles used to encode a track to multiple formats
* @return
*/
private void tagByProfile(Track track, List<EncodingProfile> profiles) {
String rawfileName = track.getURI().getRawPath();
for (EncodingProfile ep : profiles) {
// #DCE
// Add any character at the beginning of the suffix so that it is properly
// converted in toSafeName (because the regex used there may treat the first
// character differently; the default one does now).
String suffixToSanitize = "X" + ep.getSuffix();
// !! workspace.putInCollection renames the file - need to do the same with suffix
String suffix = workspace.toSafeName(suffixToSanitize).substring(1);
if (suffix.length() > 0 && rawfileName.endsWith(suffix)) {
track.addTag(ep.getIdentifier());
return;
}
}
}
private void decipherHLSPlaylistResults(Track track, JobInformation jobInfo, MediaPackage mediaPackage,
List<Track> composedTracks)
throws WorkflowOperationException, IllegalArgumentException, NotFoundException, IOException {
int nprofiles = jobInfo.getInfo().getProfiles().size();
List<Track> manifests = getManifest(composedTracks);
if (manifests.size() != nprofiles) {
throw new WorkflowOperationException("Number of output playlists does not match number of encoding profiles");
}
if (composedTracks.size() != manifests.size() * 2 - 1) {
throw new WorkflowOperationException("Number of output media does not match number of encoding profiles");
}
}
private MediaPackageElementFlavor newFlavor(Track track, String flavor) throws WorkflowOperationException {
if (StringUtils.isNotBlank(flavor)) {
try {
MediaPackageElementFlavor targetFlavor = MediaPackageElementFlavor.parseFlavor(flavor);
String flavorType = targetFlavor.getType();
String flavorSubtype = targetFlavor.getSubtype();
// Adjust the target flavor. Make sure to account for partial updates
if ("*".equals(flavorType))
flavorType = track.getFlavor().getType();
if ("*".equals(flavorSubtype))
flavorSubtype = track.getFlavor().getSubtype();
return (new MediaPackageElementFlavor(flavorType, flavorSubtype));
} catch (IllegalArgumentException e) {
throw new WorkflowOperationException("Target flavor '" + flavor + "' is malformed");
}
}
return null;
}
/**
* This class is used to store context information for the jobs.
*/
private static final class JobInformation {
private Track track = null;
private ElementProfileTagFlavor info = null;
private boolean tagWithProfile;
JobInformation(Track track, ElementProfileTagFlavor info, boolean tagWithProfile) {
this.track = track;
this.info = info;
this.tagWithProfile = tagWithProfile;
}
public List<EncodingProfile> getProfileList() {
return info.encodingProfileList;
}
/**
* Returns the track.
*
* @return the track
*/
public Track getTrack() {
return track;
}
public boolean getTagWithProfile() {
return this.tagWithProfile;
}
public ElementProfileTagFlavor getInfo() {
return info;
}
}
@Reference
@Override
public void setServiceRegistry(ServiceRegistry serviceRegistry) {
super.setServiceRegistry(serviceRegistry);
}
}