ImageWorkflowOperationHandler.java

/*
 * Licensed to The Apereo Foundation under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional
 * information regarding copyright ownership.
 *
 *
 * The Apereo Foundation licenses this file to you under the Educational
 * Community License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License
 * at:
 *
 *   http://opensource.org/licenses/ecl2.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 *
 */

package org.opencastproject.workflow.handler.composer;

import static java.lang.String.format;
import static org.opencastproject.util.EqualsUtil.eq;

import org.opencastproject.composer.api.ComposerService;
import org.opencastproject.composer.api.EncodingProfile;
import org.opencastproject.job.api.Job;
import org.opencastproject.job.api.JobBarrier;
import org.opencastproject.job.api.JobContext;
import org.opencastproject.mediapackage.Attachment;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElementFlavor;
import org.opencastproject.mediapackage.MediaPackageElementParser;
import org.opencastproject.mediapackage.MediaPackageException;
import org.opencastproject.mediapackage.MediaPackageSupport;
import org.opencastproject.mediapackage.Track;
import org.opencastproject.mediapackage.VideoStream;
import org.opencastproject.mediapackage.selector.TrackSelector;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.util.JobUtil;
import org.opencastproject.util.MimeTypes;
import org.opencastproject.util.UnknownFileTypeException;
import org.opencastproject.util.data.Collections;
import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
import org.opencastproject.workflow.api.ConfiguredTagsAndFlavors;
import org.opencastproject.workflow.api.WorkflowInstance;
import org.opencastproject.workflow.api.WorkflowOperationException;
import org.opencastproject.workflow.api.WorkflowOperationHandler;
import org.opencastproject.workflow.api.WorkflowOperationInstance;
import org.opencastproject.workflow.api.WorkflowOperationResult;
import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
import org.opencastproject.workspace.api.Workspace;

import org.apache.commons.io.FilenameUtils;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.IllegalFormatException;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

/**
 * The workflow definition for handling "image" operations
 */
@Component(
    immediate = true,
    service = WorkflowOperationHandler.class,
    property = {
        "service.description=Image Workflow Operation Handler",
        "workflow.operation=image"
    }
)
public class ImageWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
  /** The logging facility */
  private static final Logger logger = LoggerFactory.getLogger(ImageWorkflowOperationHandler.class);

  // legacy option
  public static final String OPT_PROFILES = "encoding-profile";
  public static final String OPT_POSITIONS = "time";
  public static final String OPT_TARGET_BASE_NAME_FORMAT_SECOND = "target-base-name-format-second";
  public static final String OPT_TARGET_BASE_NAME_FORMAT_PERCENT = "target-base-name-format-percent";
  public static final String OPT_END_MARGIN = "end-margin";

  private static final long END_MARGIN_DEFAULT = 100;
  public static final double SINGLE_FRAME_POS = 0.0;

  /** The composer service */
  private ComposerService composerService = null;

  /** The local workspace */
  private Workspace workspace = null;

  /**
   * Callback for the OSGi declarative services configuration.
   *
   * @param composerService
   *          the composer service
   */
  @Reference
  protected void setComposerService(ComposerService composerService) {
    this.composerService = composerService;
  }

  /**
   * Callback for declarative services configuration that will introduce us to the local workspace service.
   * Implementation assumes that the reference is configured as being static.
   *
   * @param workspace
   *          an instance of the workspace
   */
  @Reference
  public void setWorkspace(Workspace workspace) {
    this.workspace = workspace;
  }

  @Override
  public WorkflowOperationResult start(final WorkflowInstance wi, JobContext ctx)
      throws WorkflowOperationException {
    logger.debug("Running image workflow operation on {}", wi);
    try {
      MediaPackage mp = wi.getMediaPackage();
      final Cfg cfg = configure(mp, wi);
      mp = MediaPackageSupport.copy(mp);

      // Extract
      if (cfg.sourceTracks.size() == 0) {
        logger.info("No source tracks found in media package {}, skipping operation", mp.getIdentifier());
        return this.createResult(mp, Action.SKIP);
      }
      // start image extraction jobs
      final List<Extraction> extractions = cfg.sourceTracks.stream().flatMap(track -> {
        final List<MediaPosition> positions = limit(track, cfg.positions);
        if (positions.size() != cfg.positions.size()) {
          logger.warn("Could not apply all configured positions to track {}", track);
        }
        logger.info("Extracting images from {} at position {}", track, positions);
        // create one extraction per encoding profile
        return cfg.profiles.stream()
            .map(profile -> {
              try {
                return new Extraction(extractImages(this, track, profile, positions, cfg), track, profile, positions);
              } catch (WorkflowOperationException e) {
                throw new RuntimeException(e);
              }
            });
      }).collect(Collectors.toList());
      final List<Job> extractionJobs = concatJobs(extractions);
      final JobBarrier.Result extractionResult = JobUtil.waitForJobs(this.serviceRegistry, extractionJobs);
      if (extractionResult.isSuccess()) {
        // all extractions were successful; iterate them
        for (final Extraction extraction : extractions) {
          final List<Attachment> images = getImages(extraction.job);
          final int expectedNrOfImages = extraction.positions.size();
          if (images.size() == expectedNrOfImages) {
            // post process images
            int size = Math.min(images.size(), extraction.positions.size());
            for (int i = 0; i < size; i++) {
              Attachment image = images.get(i);
              MediaPosition position = extraction.positions.get(i);
              adjustMetadata(extraction, image, cfg);
              if (image.getIdentifier() == null) {
                image.generateIdentifier();
              }
              mp.addDerived(image, extraction.track);
              String fileName = createFileName(
                  extraction.profile.getSuffix(), extraction.track.getURI(), position, cfg);
              moveToWorkspace(this, mp, image, fileName);
            }
          } else {
            // fewer images than expected have been extracted
            throw new WorkflowOperationException(
                format("Only %s of %s images have been extracted from track %s",
                    images.size(), expectedNrOfImages, extraction.track));
          }
        }
        return this.createResult(mp, Action.CONTINUE, JobUtil.sumQueueTime(extractionJobs));
      } else {
        throw new WorkflowOperationException("Image extraction failed");
      }
    } catch (Exception e) {
      throw new WorkflowOperationException(e);
    }
  }

  /**
   * Adjust flavor, tags, mime type of <code>image</code> according to the
   * configuration and the extraction.
   */
  protected void adjustMetadata(Extraction extraction, Attachment image, Cfg cfg) {
    // Adjust the target flavor. Make sure to account for partial updates
    for (final MediaPackageElementFlavor flavor : cfg.targetImageFlavor) {
      final String flavorType = eq("*", flavor.getType())
          ? extraction.track.getFlavor().getType()
          : flavor.getType();
      final String flavorSubtype = eq("*", flavor.getSubtype())
          ? extraction.track.getFlavor().getSubtype()
          : flavor.getSubtype();
      image.setFlavor(new MediaPackageElementFlavor(flavorType, flavorSubtype));
      logger.debug("Resulting image has flavor '{}'", image.getFlavor());
    }
    // Set the mime type
    try {
      image.setMimeType(MimeTypes.fromURI(image.getURI()));
    } catch (UnknownFileTypeException e) {
      logger.warn("Mime type unknown for file {}. Setting none.", image.getURI(), e);
    }
    // Add tags
    applyTargetTagsToElement(cfg.targetImageTags, image);
  }

  /** Create a file name for the extracted image. */
  protected String createFileName(final String suffix, final URI trackUri, final MediaPosition pos, Cfg cfg)
      throws WorkflowOperationException {
    final String trackBaseName = FilenameUtils.getBaseName(trackUri.getPath());
    final String format;
    switch (pos.type) {
      case Seconds:
        format = cfg.targetBaseNameFormatSecond.orElse(trackBaseName + "_%.3fs%s");
        break;
      case Percentage:
        format = cfg.targetBaseNameFormatPercent.orElse(trackBaseName + "_%.1fp%s");
        break;
      default:
        throw new WorkflowOperationException("Unexhaustive match");
    }
    return formatFileName(format, pos.position, suffix);
  }

  /** Move the extracted <code>image</code> to its final location in the workspace and rename it to <code>fileName</code>. */
  protected void moveToWorkspace(final ImageWorkflowOperationHandler handler, final MediaPackage mp,
      final Attachment image, final String fileName) throws WorkflowOperationException {
    try {
      image.setURI(handler.workspace.moveTo(
          image.getURI(),
          mp.getIdentifier().toString(),
          image.getIdentifier(),
          fileName));
    } catch (Exception e) {
      throw new WorkflowOperationException(e);
    }
  }

  /** Start a composer job to extract images from a track at the given positions. */
  protected Job extractImages(
      final ImageWorkflowOperationHandler handler,
      final Track track,
      final EncodingProfile profile,
      final List<MediaPosition> positions,
      Cfg cfg
  ) throws WorkflowOperationException {
    List<Double> seconds = new ArrayList<>();
    for (MediaPosition mediaPosition : positions) {
      seconds.add(toSeconds(track, mediaPosition, cfg.endMargin));
    }

    try {
      return handler.composerService.image(track, profile.getIdentifier(), Collections.toDoubleArray(seconds));
    } catch (Exception e) {
      throw new WorkflowOperationException("Error starting image extraction job", e);
    }
  }

  // ** ** **

  /**
   * Format a filename and make it "safe".
   */
  static String formatFileName(String format, double position, String suffix) {
    return format(Locale.ROOT, format, position, suffix);
  }


  /** Concat the jobs of a list of extraction objects. */
  private static List<Job> concatJobs(List<Extraction> extractions) {
    List<Job> jobs = new ArrayList<>();
    for (Extraction extraction : extractions) {
      jobs.add(extraction.job);
    }
    return jobs;
  }

  /** Get the images (payload) from a job. */
  @SuppressWarnings("unchecked")
  private static List<Attachment> getImages(Job job) throws MediaPackageException, WorkflowOperationException {
    final List<Attachment> images;
    images = (List<Attachment>) MediaPackageElementParser.getArrayFromXml(job.getPayload());
    if (!images.isEmpty()) {
      return images;
    } else {
      throw new WorkflowOperationException("Job did not extract any images");
    }
  }

  /** Limit the list of media positions to those that fit into the length of the track. */
  static List<MediaPosition> limit(Track track, List<MediaPosition> positions) {
    final Long duration = track.getDuration();
    // if the video has just one frame (e.g.: MP3-Podcasts) it makes no sense to go to a certain position
    // as the video has only one image at position 0
    if (duration == null || (track.getStreams() != null && Arrays.stream(track.getStreams())
        .filter(stream -> stream instanceof VideoStream)
        .map(org.opencastproject.mediapackage.Stream::getFrameCount)
        .allMatch(frameCount -> frameCount == null || frameCount == 1))) {
      return java.util.Collections.singletonList(new MediaPosition(PositionType.Seconds, 0));
    }

    return positions.stream()
        .filter(p -> (PositionType.Seconds.equals(p.type) && p.position >= 0 && p.position < duration)
            || (PositionType.Percentage.equals(p.type) && p.position >= 0 && p.position <= 100))
        .collect(Collectors.toList());
  }

  /**
   * Convert a <code>position</code> into seconds in relation to the given track.
   * <em>Attention:</em> The function does not check if the calculated absolute position is within
   * the bounds of the tracks length.
   */
  static double toSeconds(Track track, MediaPosition position, double endMarginMs) throws WorkflowOperationException {
    final long durationMs = track.getDuration() == null ? 0 : track.getDuration();
    final double posMs;
    switch (position.type) {
      case Percentage:
        posMs = durationMs * position.position / 100.0;
        break;
      case Seconds:
        posMs = position.position * 1000.0;
        break;
      default:
        throw new IllegalArgumentException("Unhandled MediaPosition type: " + position.type);
    }
    // limit maximum position to Xms before the end of the video
    return Math.abs(durationMs - posMs) >= endMarginMs
        ? posMs / 1000.0
        : Math.max(0, ((double) durationMs - endMarginMs)) / 1000.0;
  }

  // ** ** **

  /**
   * Fetch a profile from the composer service. Throw a WorkflowOperationException in case the profile
   * does not exist.
   */
  public static EncodingProfile fetchProfile(ComposerService composerService, String profileName)
      throws WorkflowOperationException {
    EncodingProfile profile = composerService.getProfile(profileName);
    if (profile == null) {
      throw new WorkflowOperationException("Encoding profile '" + profileName + "' was not found");
    }
    return profile;
  }

  /**
   * Describes the extraction of a list of images from a track, extracted after a certain encoding profile.
   * Track -> (profile, positions)
   */
  static final class Extraction {
    /** The extraction job. */
    private final Job job;
    /** The track to extract from. */
    private final Track track;
    /** The encoding profile to use for extraction. */
    private final EncodingProfile profile;
    /** Media positions. */
    private final List<MediaPosition> positions;

    private Extraction(Job job, Track track, EncodingProfile profile, List<MediaPosition> positions) {
      this.job = job;
      this.track = track;
      this.profile = profile;
      this.positions = positions;
    }
  }

  // ** ** **

  /**
   * The WOH's configuration options.
   */
  static final class Cfg {
    /** List of source tracks, with duration. */
    private final List<Track> sourceTracks;
    private final List<MediaPosition> positions;
    private final List<EncodingProfile> profiles;
    private final List<MediaPackageElementFlavor> targetImageFlavor;
    private final ConfiguredTagsAndFlavors.TargetTags targetImageTags;
    private final Optional<String> targetBaseNameFormatSecond;
    private final Optional<String> targetBaseNameFormatPercent;
    private final long endMargin;

    Cfg(List<Track> sourceTracks,
        List<MediaPosition> positions,
        List<EncodingProfile> profiles,
        List<MediaPackageElementFlavor> targetImageFlavor,
        ConfiguredTagsAndFlavors.TargetTags targetImageTags,
        Optional<String> targetBaseNameFormatSecond,
        Optional<String> targetBaseNameFormatPercent,
        long endMargin) {
      this.sourceTracks = sourceTracks;
      this.positions = positions;
      this.profiles = profiles;
      this.targetImageFlavor = targetImageFlavor;
      this.targetImageTags = targetImageTags;
      this.endMargin = endMargin;
      this.targetBaseNameFormatSecond = targetBaseNameFormatSecond;
      this.targetBaseNameFormatPercent = targetBaseNameFormatPercent;
    }
  }

  /** Get and parse the configuration options. */
  private Cfg configure(MediaPackage mp, WorkflowInstance wi) throws WorkflowOperationException {
    WorkflowOperationInstance woi = wi.getCurrentOperation();
    ConfiguredTagsAndFlavors tagsAndFlavors = getTagsAndFlavors(wi,
        Configuration.many, Configuration.many, Configuration.many, Configuration.one);
    final List<EncodingProfile> profiles = getOptConfig(woi, OPT_PROFILES)
        .map(config -> Arrays.asList(config.split(",")))
        .orElse(java.util.Collections.emptyList())
        .stream()
        .map(String::trim)
        .filter(profileName -> !profileName.isEmpty())
        .map(profileName -> {
          try {
            return fetchProfile(composerService, profileName);
          } catch (WorkflowOperationException e) {
            throw new RuntimeException(e);
          }
        })
        .collect(Collectors.toList());
    final ConfiguredTagsAndFlavors.TargetTags targetImageTags = tagsAndFlavors.getTargetTags();
    final List<MediaPackageElementFlavor> targetImageFlavor = tagsAndFlavors.getTargetFlavors();
    final List<Track> sourceTracks;
    {
      // get the source tags
      final List<String> sourceTags = tagsAndFlavors.getSrcTags();
      final List<MediaPackageElementFlavor> sourceFlavors = tagsAndFlavors.getSrcFlavors();
      TrackSelector trackSelector = new TrackSelector();

      //add tags and flavors to TrackSelector
      for (String tag : sourceTags) {
        trackSelector.addTag(tag);
      }
      for (MediaPackageElementFlavor flavor : sourceFlavors) {
        trackSelector.addFlavor(flavor);
      }

      // select the tracks based on source flavors and tags and skip those that don't have video
      sourceTracks = trackSelector.select(mp, true).stream()
          .filter(Track::hasVideo)
          .collect(Collectors.toList());
    }
    final List<MediaPosition> positions = parsePositions(getConfig(woi, OPT_POSITIONS));
    final long endMargin = getOptConfig(woi, OPT_END_MARGIN)
        .map(Long::parseLong)
        .orElse(END_MARGIN_DEFAULT);
    return new Cfg(sourceTracks,
        positions,
        profiles,
        targetImageFlavor,
        targetImageTags,
        getTargetBaseNameFormat(woi, OPT_TARGET_BASE_NAME_FORMAT_SECOND),
        getTargetBaseNameFormat(woi, OPT_TARGET_BASE_NAME_FORMAT_PERCENT),
        endMargin);
  }

  /** Validate a target base name format. */
  private Optional<String> getTargetBaseNameFormat(WorkflowOperationInstance woi, final String formatName)
      throws WorkflowOperationException {
    Optional<String> baseName = getOptConfig(woi, formatName);
    if (baseName.isPresent()) {
      baseName = Optional.ofNullable(validateTargetBaseNameFormat(baseName.get(), formatName));
    }
    return baseName;
  }

  static String validateTargetBaseNameFormat(String format, final String formatName) throws WorkflowOperationException {
    boolean valid;
    String name = null;
    try {
      name = formatFileName(format, 15.11, ".png");
      valid = name.contains(".") && name.contains(".png");
    } catch (IllegalFormatException e) {
      valid = false;
    }
    if (!valid || name == null) {
      throw new WorkflowOperationException(format(
          "%s is not a valid format string for config option %s",
          format, formatName));
    }

    return name;
  }

  // ** ** **

  /**
   * Parse media position parameter strings.
   */
  static final class MediaPositionParser {

    public static List<MediaPosition> parsePositions(String input) {
      List<MediaPosition> positions = new ArrayList<>();
      int index = 0;
      int length = input.length();

      while (index < length) {
        // Skip any separators (whitespace or commas)
        while (index < length && (Character.isWhitespace(input.charAt(index)) || input.charAt(index) == ',')) {
          index++;
        }

        if (index >= length) break;

        // Parse optional minus sign
        int start = index;
        if (input.charAt(index) == '-') {
          index++;
        }

        boolean dotSeen = false;
        while (index < length) {
          char c = input.charAt(index);
          if (Character.isDigit(c)) {
            index++;
          } else if (c == '.' && !dotSeen) {
            dotSeen = true;
            index++;
          } else {
            break;
          }
        }

        if (start == index || (input.charAt(start) == '-' && start + 1 == index)) {
          throw new IllegalArgumentException("Expected number at position " + start);
        }

        double value = Double.parseDouble(input.substring(start, index));

        // Check for optional percent sign
        boolean isPercentage = false;
        if (index < length && input.charAt(index) == '%') {
          isPercentage = true;
          index++;
        }

        PositionType type = isPercentage ? PositionType.Percentage : PositionType.Seconds;
        positions.add(new MediaPosition(type, value));
      }

      return positions;
    }
  }

  private List<MediaPosition> parsePositions(String time) throws WorkflowOperationException {
    final List<MediaPosition> r = MediaPositionParser.parsePositions(time);
    if (!r.isEmpty()) {
      return r;
    } else {
      throw new WorkflowOperationException(format("Cannot parse time string %s.", time));
    }
  }

  enum PositionType {
    Percentage, Seconds
  }

  /**
   * A position in time in a media file.
   */
  static final class MediaPosition {
    private double position;
    private final PositionType type;

    MediaPosition(PositionType type, double position) {
      this.position = position;
      this.type = type;
    }

    public void setPosition(double position) {
      this.position = position;
    }

    @Override public int hashCode() {
      return Objects.hash(position, type);
    }

    @Override public boolean equals(Object that) {
      return (this == that) || (that instanceof MediaPosition && eqFields((MediaPosition) that));
    }

    private boolean eqFields(MediaPosition that) {
      return position == that.position && eq(type, that.type);
    }

    @Override public String toString() {
      return format("MediaPosition(%s, %s)", type, position);
    }
  }

  @Reference
  @Override
  public void setServiceRegistry(ServiceRegistry serviceRegistry) {
    super.setServiceRegistry(serviceRegistry);
  }

}