ImageWorkflowOperationHandler.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.workflow.handler.composer;
import static com.entwinemedia.fn.Equality.hash;
import static com.entwinemedia.fn.Prelude.chuck;
import static com.entwinemedia.fn.Prelude.unexhaustiveMatchError;
import static com.entwinemedia.fn.Stream.$;
import static com.entwinemedia.fn.parser.Parsers.character;
import static com.entwinemedia.fn.parser.Parsers.many;
import static com.entwinemedia.fn.parser.Parsers.opt;
import static com.entwinemedia.fn.parser.Parsers.space;
import static com.entwinemedia.fn.parser.Parsers.symbol;
import static com.entwinemedia.fn.parser.Parsers.token;
import static java.lang.String.format;
import static org.opencastproject.util.EqualsUtil.eq;
import org.opencastproject.composer.api.ComposerService;
import org.opencastproject.composer.api.EncodingProfile;
import org.opencastproject.job.api.Job;
import org.opencastproject.job.api.JobBarrier;
import org.opencastproject.job.api.JobContext;
import org.opencastproject.mediapackage.Attachment;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElement;
import org.opencastproject.mediapackage.MediaPackageElementFlavor;
import org.opencastproject.mediapackage.MediaPackageElementParser;
import org.opencastproject.mediapackage.MediaPackageException;
import org.opencastproject.mediapackage.MediaPackageSupport;
import org.opencastproject.mediapackage.Track;
import org.opencastproject.mediapackage.VideoStream;
import org.opencastproject.mediapackage.selector.AbstractMediaPackageElementSelector;
import org.opencastproject.mediapackage.selector.TrackSelector;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.util.JobUtil;
import org.opencastproject.util.MimeTypes;
import org.opencastproject.util.UnknownFileTypeException;
import org.opencastproject.util.data.Collections;
import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
import org.opencastproject.workflow.api.ConfiguredTagsAndFlavors;
import org.opencastproject.workflow.api.WorkflowInstance;
import org.opencastproject.workflow.api.WorkflowOperationException;
import org.opencastproject.workflow.api.WorkflowOperationHandler;
import org.opencastproject.workflow.api.WorkflowOperationInstance;
import org.opencastproject.workflow.api.WorkflowOperationResult;
import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
import org.opencastproject.workspace.api.Workspace;
import com.entwinemedia.fn.Fn;
import com.entwinemedia.fn.Fn2;
import com.entwinemedia.fn.Fx;
import com.entwinemedia.fn.P2;
import com.entwinemedia.fn.Prelude;
import com.entwinemedia.fn.StreamFold;
import com.entwinemedia.fn.data.Opt;
import com.entwinemedia.fn.fns.Strings;
import com.entwinemedia.fn.parser.Parser;
import com.entwinemedia.fn.parser.Parsers;
import com.entwinemedia.fn.parser.Result;
import org.apache.commons.io.FilenameUtils;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Arrays;
import java.util.IllegalFormatException;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* The workflow definition for handling "image" operations
*/
@Component(
immediate = true,
service = WorkflowOperationHandler.class,
property = {
"service.description=Image Workflow Operation Handler",
"workflow.operation=image"
}
)
public class ImageWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
/** The logging facility */
private static final Logger logger = LoggerFactory.getLogger(ImageWorkflowOperationHandler.class);
// legacy option
public static final String OPT_PROFILES = "encoding-profile";
public static final String OPT_POSITIONS = "time";
public static final String OPT_TARGET_BASE_NAME_FORMAT_SECOND = "target-base-name-format-second";
public static final String OPT_TARGET_BASE_NAME_FORMAT_PERCENT = "target-base-name-format-percent";
public static final String OPT_END_MARGIN = "end-margin";
private static final long END_MARGIN_DEFAULT = 100;
public static final double SINGLE_FRAME_POS = 0.0;
/** The composer service */
private ComposerService composerService = null;
/** The local workspace */
private Workspace workspace = null;
/**
* Callback for the OSGi declarative services configuration.
*
* @param composerService
* the composer service
*/
@Reference
protected void setComposerService(ComposerService composerService) {
this.composerService = composerService;
}
/**
* Callback for declarative services configuration that will introduce us to the local workspace service.
* Implementation assumes that the reference is configured as being static.
*
* @param workspace
* an instance of the workspace
*/
@Reference
public void setWorkspace(Workspace workspace) {
this.workspace = workspace;
}
@Override
public WorkflowOperationResult start(final WorkflowInstance wi, JobContext ctx)
throws WorkflowOperationException {
logger.debug("Running image workflow operation on {}", wi);
try {
MediaPackage mp = wi.getMediaPackage();
final Extractor e = new Extractor(this, configure(mp, wi));
return e.main(MediaPackageSupport.copy(mp));
} catch (Exception e) {
throw new WorkflowOperationException(e);
}
}
/**
* Computation within the context of a {@link Cfg}.
*/
static final class Extractor {
private final ImageWorkflowOperationHandler handler;
private final Cfg cfg;
Extractor(ImageWorkflowOperationHandler handler, Cfg cfg) {
this.handler = handler;
this.cfg = cfg;
}
/** Run the extraction. */
WorkflowOperationResult main(final MediaPackage mp) throws WorkflowOperationException {
if (cfg.sourceTracks.size() == 0) {
logger.info("No source tracks found in media package {}, skipping operation", mp.getIdentifier());
return handler.createResult(mp, Action.SKIP);
}
// start image extraction jobs
final List<Extraction> extractions = cfg.sourceTracks.stream().flatMap(track -> {
final List<MediaPosition> positions = limit(track, cfg.positions);
if (positions.size() != cfg.positions.size()) {
logger.warn("Could not apply all configured positions to track {}", track);
}
logger.info("Extracting images from {} at position {}", track, positions);
// create one extraction per encoding profile
return cfg.profiles.stream()
.map(profile -> new Extraction(extractImages(track, profile, positions), track, profile, positions));
}).collect(Collectors.toList());
final List<Job> extractionJobs = concatJobs(extractions);
final JobBarrier.Result extractionResult = JobUtil.waitForJobs(handler.serviceRegistry, extractionJobs);
if (extractionResult.isSuccess()) {
// all extractions were successful; iterate them
for (final Extraction extraction : extractions) {
final List<Attachment> images = getImages(extraction.job);
final int expectedNrOfImages = extraction.positions.size();
if (images.size() == expectedNrOfImages) {
// post process images
for (final P2<Attachment, MediaPosition> image : $(images).zip(extraction.positions)) {
adjustMetadata(extraction, image.get1());
if (image.get1().getIdentifier() == null) image.get1().setIdentifier(UUID.randomUUID().toString());
mp.addDerived(image.get1(), extraction.track);
final String fileName = createFileName(
extraction.profile.getSuffix(), extraction.track.getURI(), image.get2());
moveToWorkspace(mp, image.get1(), fileName);
}
} else {
// less images than expected have been extracted
throw new WorkflowOperationException(
format("Only %s of %s images have been extracted from track %s",
images.size(), expectedNrOfImages, extraction.track));
}
}
return handler.createResult(mp, Action.CONTINUE, JobUtil.sumQueueTime(extractionJobs));
} else {
throw new WorkflowOperationException("Image extraction failed");
}
}
/**
* Adjust flavor, tags, mime type of <code>image</code> according to the
* configuration and the extraction.
*/
void adjustMetadata(Extraction extraction, Attachment image) {
// Adjust the target flavor. Make sure to account for partial updates
for (final MediaPackageElementFlavor flavor : cfg.targetImageFlavor) {
final String flavorType = eq("*", flavor.getType())
? extraction.track.getFlavor().getType()
: flavor.getType();
final String flavorSubtype = eq("*", flavor.getSubtype())
? extraction.track.getFlavor().getSubtype()
: flavor.getSubtype();
image.setFlavor(new MediaPackageElementFlavor(flavorType, flavorSubtype));
logger.debug("Resulting image has flavor '{}'", image.getFlavor());
}
// Set the mime type
try {
image.setMimeType(MimeTypes.fromURI(image.getURI()));
} catch (UnknownFileTypeException e) {
logger.warn("Mime type unknown for file {}. Setting none.", image.getURI(), e);
}
// Add tags
for (final String tag : cfg.targetImageTags) {
logger.trace("Tagging image with '{}'", tag);
image.addTag(tag);
}
}
/** Create a file name for the extracted image. */
String createFileName(final String suffix, final URI trackUri, final MediaPosition pos) {
final String trackBaseName = FilenameUtils.getBaseName(trackUri.getPath());
final String format;
switch (pos.type) {
case Seconds:
format = cfg.targetBaseNameFormatSecond.getOr(trackBaseName + "_%.3fs%s");
break;
case Percentage:
format = cfg.targetBaseNameFormatPercent.getOr(trackBaseName + "_%.1fp%s");
break;
default:
throw unexhaustiveMatchError();
}
return formatFileName(format, pos.position, suffix);
}
/** Move the extracted <code>image</code> to its final location in the workspace and rename it to <code>fileName</code>. */
private void moveToWorkspace(final MediaPackage mp, final Attachment image, final String fileName) {
try {
image.setURI(handler.workspace.moveTo(
image.getURI(),
mp.getIdentifier().toString(),
image.getIdentifier(),
fileName));
} catch (Exception e) {
chuck(new WorkflowOperationException(e));
}
}
/** Start a composer job to extract images from a track at the given positions. */
private Job extractImages(final Track track, final EncodingProfile profile, final List<MediaPosition> positions) {
final List<Double> p = $(positions).map(new Fn<MediaPosition, Double>() {
@Override public Double apply(MediaPosition mediaPosition) {
return toSeconds(track, mediaPosition, cfg.endMargin);
}
}).toList();
try {
return handler.composerService.image(track, profile.getIdentifier(), Collections.toDoubleArray(p));
} catch (Exception e) {
return chuck(new WorkflowOperationException("Error starting image extraction job", e));
}
}
}
// ** ** **
/**
* Format a filename and make it "safe".
*/
static String formatFileName(String format, double position, String suffix) {
return format(Locale.ROOT, format, position, suffix);
}
/** Concat the jobs of a list of extraction objects. */
private static List<Job> concatJobs(List<Extraction> extractions) {
return $(extractions).map(new Fn<Extraction, Job>() {
@Override public Job apply(Extraction extraction) {
return extraction.job;
}
}).toList();
}
/** Get the images (payload) from a job. */
@SuppressWarnings("unchecked")
private static List<Attachment> getImages(Job job) {
final List<Attachment> images;
try {
images = (List<Attachment>) MediaPackageElementParser.getArrayFromXml(job.getPayload());
} catch (MediaPackageException e) {
return chuck(e);
}
if (!images.isEmpty()) {
return images;
} else {
return chuck(new WorkflowOperationException("Job did not extract any images"));
}
}
/** Limit the list of media positions to those that fit into the length of the track. */
static List<MediaPosition> limit(Track track, List<MediaPosition> positions) {
final Long duration = track.getDuration();
// if the video has just one frame (e.g.: MP3-Podcasts) it makes no sense to go to a certain position
// as the video has only one image at position 0
if (duration == null || (track.getStreams() != null && Arrays.stream(track.getStreams())
.filter(stream -> stream instanceof VideoStream)
.map(org.opencastproject.mediapackage.Stream::getFrameCount)
.allMatch(frameCount -> frameCount == null || frameCount == 1))) {
return java.util.Collections.singletonList(new MediaPosition(PositionType.Seconds, 0));
}
return positions.stream()
.filter(p -> (PositionType.Seconds.equals(p.type) && p.position >= 0 && p.position < duration)
|| (PositionType.Percentage.equals(p.type) && p.position >= 0 && p.position <= 100))
.collect(Collectors.toList());
}
/**
* Convert a <code>position</code> into seconds in relation to the given track.
* <em>Attention:</em> The function does not check if the calculated absolute position is within
* the bounds of the tracks length.
*/
static double toSeconds(Track track, MediaPosition position, double endMarginMs) {
final long durationMs = track.getDuration() == null ? 0 : track.getDuration();
final double posMs;
switch (position.type) {
case Percentage:
posMs = durationMs * position.position / 100.0;
break;
case Seconds:
posMs = position.position * 1000.0;
break;
default:
throw unexhaustiveMatchError();
}
// limit maximum position to Xms before the end of the video
return Math.abs(durationMs - posMs) >= endMarginMs
? posMs / 1000.0
: Math.max(0, ((double) durationMs - endMarginMs)) / 1000.0;
}
// ** ** **
/** Create a fold that folds flavors into a media package element selector. */
public static <E extends MediaPackageElement, S extends AbstractMediaPackageElementSelector<E>>
StreamFold<MediaPackageElementFlavor, S> flavorFold(S selector) {
return StreamFold.foldl(selector, new Fn2<S, MediaPackageElementFlavor, S>() {
@Override public S apply(S sum, MediaPackageElementFlavor flavor) {
sum.addFlavor(flavor);
return sum;
}
});
}
/** Create a fold that folds tags into a media package element selector. */
public static <E extends MediaPackageElement, S extends AbstractMediaPackageElementSelector<E>>
StreamFold<String, S> tagFold(S selector) {
return StreamFold.foldl(selector, new Fn2<S, String, S>() {
@Override public S apply(S sum, String tag) {
sum.addTag(tag);
return sum;
}
});
}
/**
* Fetch a profile from the composer service. Throw a WorkflowOperationException in case the profile
* does not exist.
*/
public static Fn<String, EncodingProfile> fetchProfile(final ComposerService composerService) {
return new Fn<String, EncodingProfile>() {
@Override public EncodingProfile apply(String profileName) {
final EncodingProfile profile = composerService.getProfile(profileName);
return profile != null
? profile
: Prelude.<EncodingProfile>chuck(new WorkflowOperationException("Encoding profile '" + profileName + "' was not found"));
}
};
}
/**
* Describes the extraction of a list of images from a track, extracted after a certain encoding profile.
* Track -> (profile, positions)
*/
static final class Extraction {
/** The extraction job. */
private final Job job;
/** The track to extract from. */
private final Track track;
/** The encoding profile to use for extraction. */
private final EncodingProfile profile;
/** Media positions. */
private final List<MediaPosition> positions;
private Extraction(Job job, Track track, EncodingProfile profile, List<MediaPosition> positions) {
this.job = job;
this.track = track;
this.profile = profile;
this.positions = positions;
}
}
// ** ** **
/**
* The WOH's configuration options.
*/
static final class Cfg {
/** List of source tracks, with duration. */
private final List<Track> sourceTracks;
private final List<MediaPosition> positions;
private final List<EncodingProfile> profiles;
private final List<MediaPackageElementFlavor> targetImageFlavor;
private final List<String> targetImageTags;
private final Opt<String> targetBaseNameFormatSecond;
private final Opt<String> targetBaseNameFormatPercent;
private final long endMargin;
Cfg(List<Track> sourceTracks,
List<MediaPosition> positions,
List<EncodingProfile> profiles,
List<MediaPackageElementFlavor> targetImageFlavor,
List<String> targetImageTags,
Opt<String> targetBaseNameFormatSecond,
Opt<String> targetBaseNameFormatPercent,
long endMargin) {
this.sourceTracks = sourceTracks;
this.positions = positions;
this.profiles = profiles;
this.targetImageFlavor = targetImageFlavor;
this.targetImageTags = targetImageTags;
this.endMargin = endMargin;
this.targetBaseNameFormatSecond = targetBaseNameFormatSecond;
this.targetBaseNameFormatPercent = targetBaseNameFormatPercent;
}
}
/** Get and parse the configuration options. */
private Cfg configure(MediaPackage mp, WorkflowInstance wi) throws WorkflowOperationException {
WorkflowOperationInstance woi = wi.getCurrentOperation();
ConfiguredTagsAndFlavors tagsAndFlavors = getTagsAndFlavors(wi,
Configuration.many, Configuration.many, Configuration.many, Configuration.one);
final List<EncodingProfile> profiles = getOptConfig(woi, OPT_PROFILES).toStream().bind(asList.toFn())
.map(fetchProfile(composerService)).toList();
final List<String> targetImageTags = tagsAndFlavors.getTargetTags();
final List<MediaPackageElementFlavor> targetImageFlavor = tagsAndFlavors.getTargetFlavors();
final List<Track> sourceTracks;
{
// get the source tags
final List<String> sourceTags = tagsAndFlavors.getSrcTags();
final List<MediaPackageElementFlavor> sourceFlavors = tagsAndFlavors.getSrcFlavors();
TrackSelector trackSelector = new TrackSelector();
//add tags and flavors to TrackSelector
for (String tag : sourceTags) {
trackSelector.addTag(tag);
}
for (MediaPackageElementFlavor flavor : sourceFlavors) {
trackSelector.addFlavor(flavor);
}
// select the tracks based on source flavors and tags and skip those that don't have video
sourceTracks = trackSelector.select(mp, true).stream()
.filter(Track::hasVideo)
.collect(Collectors.toList());
}
final List<MediaPosition> positions = parsePositions(getConfig(woi, OPT_POSITIONS));
final long endMargin = getOptConfig(woi, OPT_END_MARGIN).bind(Strings.toLong).getOr(END_MARGIN_DEFAULT);
//
return new Cfg(sourceTracks,
positions,
profiles,
targetImageFlavor,
targetImageTags,
getTargetBaseNameFormat(woi, OPT_TARGET_BASE_NAME_FORMAT_SECOND),
getTargetBaseNameFormat(woi, OPT_TARGET_BASE_NAME_FORMAT_PERCENT),
endMargin);
}
/** Validate a target base name format. */
private Opt<String> getTargetBaseNameFormat(WorkflowOperationInstance woi, final String formatName) {
return getOptConfig(woi, formatName).each(validateTargetBaseNameFormat(formatName));
}
static Fx<String> validateTargetBaseNameFormat(final String formatName) {
return new Fx<String>() {
@Override public void apply(String format) {
boolean valid;
try {
final String name = formatFileName(format, 15.11, ".png");
valid = name.contains(".") && name.contains(".png");
} catch (IllegalFormatException e) {
valid = false;
}
if (!valid) {
chuck(new WorkflowOperationException(format(
"%s is not a valid format string for config option %s",
format, formatName)));
}
}
};
}
// ** ** **
/**
* Parse media position parameter strings.
*/
static final class MediaPositionParser {
private MediaPositionParser() {
}
static final Parser<Double> number = token(Parsers.dbl);
static final Parser<MediaPosition> seconds = number.bind(new Fn<Double, Parser<MediaPosition>>() {
@Override public Parser<MediaPosition> apply(Double p) {
return Parsers.yield(new MediaPosition(PositionType.Seconds, p));
}
});
static final Parser<MediaPosition> percentage =
number.bind(Parsers.<Double, String>ignore(symbol("%"))).bind(new Fn<Double, Parser<MediaPosition>>() {
@Override public Parser<MediaPosition> apply(Double p) {
return Parsers.yield(new MediaPosition(PositionType.Percentage, p));
}
});
static final Parser<Character> comma = token(character(','));
static final Parser<Character> ws = token(space);
static final Parser<MediaPosition> position = percentage.or(seconds);
/** Main parser. */
static final Parser<List<MediaPosition>> positions =
position.bind(new Fn<MediaPosition, Parser<List<MediaPosition>>>() {
// first position
@Override public Parser<List<MediaPosition>> apply(final MediaPosition first) {
// following
return many(opt(comma).bind(Parsers.ignorePrevious(position)))
.bind(new Fn<List<MediaPosition>, Parser<List<MediaPosition>>>() {
@Override public Parser<List<MediaPosition>> apply(List<MediaPosition> rest) {
return Parsers.yield($(first).append(rest).toList());
}
});
}
});
}
private List<MediaPosition> parsePositions(String time) throws WorkflowOperationException {
final Result<List<MediaPosition>> r = MediaPositionParser.positions.parse(time);
if (r.isDefined() && r.getRest().isEmpty()) {
return r.getResult();
} else {
throw new WorkflowOperationException(format("Cannot parse time string %s.", time));
}
}
enum PositionType {
Percentage, Seconds
}
/**
* A position in time in a media file.
*/
static final class MediaPosition {
private double position;
private final PositionType type;
MediaPosition(PositionType type, double position) {
this.position = position;
this.type = type;
}
public void setPosition(double position) {
this.position = position;
}
@Override public int hashCode() {
return hash(position, type);
}
@Override public boolean equals(Object that) {
return (this == that) || (that instanceof MediaPosition && eqFields((MediaPosition) that));
}
private boolean eqFields(MediaPosition that) {
return position == that.position && eq(type, that.type);
}
@Override public String toString() {
return format("MediaPosition(%s, %s)", type, position);
}
}
@Reference
@Override
public void setServiceRegistry(ServiceRegistry serviceRegistry) {
super.setServiceRegistry(serviceRegistry);
}
}