LiveScheduleServiceImpl.java

/*
 * Licensed to The Apereo Foundation under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional
 * information regarding copyright ownership.
 *
 *
 * The Apereo Foundation licenses this file to you under the Educational
 * Community License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License
 * at:
 *
 *   http://opensource.org/licenses/ecl2.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 *
 */
package org.opencastproject.liveschedule.impl;

import org.opencastproject.assetmanager.api.AssetManager;
import org.opencastproject.assetmanager.api.Snapshot;
import org.opencastproject.assetmanager.api.Version;
import org.opencastproject.capture.admin.api.CaptureAgentStateService;
import org.opencastproject.distribution.api.DistributionException;
import org.opencastproject.distribution.api.DownloadDistributionService;
import org.opencastproject.job.api.Job;
import org.opencastproject.job.api.JobBarrier;
import org.opencastproject.liveschedule.api.LiveScheduleException;
import org.opencastproject.liveschedule.api.LiveScheduleService;
import org.opencastproject.mediapackage.Attachment;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElement;
import org.opencastproject.mediapackage.MediaPackageElementBuilder;
import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
import org.opencastproject.mediapackage.MediaPackageElementFlavor;
import org.opencastproject.mediapackage.MediaPackageElementParser;
import org.opencastproject.mediapackage.MediaPackageElements;
import org.opencastproject.mediapackage.Publication;
import org.opencastproject.mediapackage.PublicationImpl;
import org.opencastproject.mediapackage.Track;
import org.opencastproject.mediapackage.VideoStream;
import org.opencastproject.mediapackage.selector.SimpleElementSelector;
import org.opencastproject.mediapackage.track.TrackImpl;
import org.opencastproject.mediapackage.track.VideoStreamImpl;
import org.opencastproject.metadata.dublincore.DCMIPeriod;
import org.opencastproject.metadata.dublincore.DublinCore;
import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
import org.opencastproject.metadata.dublincore.DublinCoreCatalogService;
import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
import org.opencastproject.search.api.SearchService;
import org.opencastproject.security.api.AccessControlList;
import org.opencastproject.security.api.AclScope;
import org.opencastproject.security.api.AuthorizationService;
import org.opencastproject.security.api.Organization;
import org.opencastproject.security.api.OrganizationDirectoryService;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.UnauthorizedException;
import org.opencastproject.security.api.User;
import org.opencastproject.security.util.SecurityUtil;
import org.opencastproject.series.api.SeriesService;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.util.MimeTypes;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.util.UrlSupport;
import org.opencastproject.workspace.api.Workspace;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.Equator;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIUtils;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Dictionary;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

@Component(
    immediate = true,
    service = LiveScheduleService.class,
    property = {
        "service.description=Live Schedule Service"
    }
)
public class LiveScheduleServiceImpl implements LiveScheduleService {
  /** The server url property **/
  static final String SERVER_URL_PROPERTY = "org.opencastproject.server.url";
  /** The engage base url property **/
  static final String ENGAGE_URL_PROPERTY = "org.opencastproject.engage.ui.url";
  /** The default path to the player **/
  static final String PLAYER_PATH = "/play/";

  /** Default values for configuration options */
  private static final String DEFAULT_STREAM_MIME_TYPE = "video/mp4";
  private static final String DEFAULT_STREAM_RESOLUTION = "1920x1080";
  private static final String DEFAULT_STREAM_NAME = "live-stream";
  private static final String DEFAULT_LIVE_TARGET_FLAVORS = "presenter/delivery";
  static final String DEFAULT_LIVE_DISTRIBUTION_SERVICE = "download";

  // Deactivating checkstyle to preserve the long URL
  // CHECKSTYLE:OFF
  // If the capture agent registered this property, we expect to get a resolution and
  // a url in the following format:
  // capture.device.live.resolution.WIDTHxHEIGHT=COMPLETE_STREAMING_URL e.g.
  // capture.device.live.resolution.960x270=rtmp://cp398121.live.edgefcs.net/live/dev-epiphan005-2-presenter-delivery.stream-960x270_1_200@355694
  public static final String CA_PROPERTY_RESOLUTION_URL_PREFIX = "capture.device.live.resolution.";
  // CHECKSTYLE:ON

  /** Variables that can be replaced in stream name */
  public static final String REPLACE_ID = "id";
  public static final String REPLACE_FLAVOR = "flavor";
  public static final String REPLACE_CA_NAME = "caName";
  public static final String REPLACE_RESOLUTION = "resolution";

  public static final String LIVE_STREAMING_URL = "live.streamingUrl";
  public static final String LIVE_STREAM_NAME = "live.streamName";
  public static final String LIVE_STREAM_MIME_TYPE = "live.mimeType";
  public static final String LIVE_STREAM_RESOLUTION = "live.resolution";
  public static final String LIVE_TARGET_FLAVORS = "live.targetFlavors";
  public static final String LIVE_DISTRIBUTION_SERVICE = "live.distributionService";
  public static final String LIVE_PUBLISH_STREAMING = "live.publishStreaming";

  private static final MediaPackageElementFlavor[] publishFlavors = { MediaPackageElements.EPISODE,
      MediaPackageElements.SERIES, MediaPackageElements.XACML_POLICY_EPISODE,
      MediaPackageElements.XACML_POLICY_SERIES }; // make configurable later

  /** The logger */
  private static final Logger logger = LoggerFactory.getLogger(LiveScheduleServiceImpl.class);

  private String liveStreamingUrl;
  private String streamName;
  private String streamMimeType;
  private String[] streamResolution;
  private MediaPackageElementFlavor[] liveFlavors;
  private String serverUrl;
  private Cache<String, Version> snapshotVersionCache
      = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
  /** Which streaming formats should be published automatically */
  private List<String> publishedStreamingFormats = null;
  private String systemUserName;

  /** Services */
  private DownloadDistributionService downloadDistributionService; // to distribute episode and series catalogs
  private SearchService searchService; // to publish/retract live media package
  private SeriesService seriesService; // to get series metadata
  private DublinCoreCatalogService dublinCoreService; // to setialize dc catalogs
  private CaptureAgentStateService captureAgentService; // to get agent capabilities
  private ServiceRegistry serviceRegistry; // to create publish/retract jobs
  private Workspace workspace; // to save dc catalogs before distributing
  private AssetManager assetManager; // to get current media package
  private AuthorizationService authService;
  private OrganizationDirectoryService organizationService;
  private SecurityService securityService;

  private long jobPollingInterval = JobBarrier.DEFAULT_POLLING_INTERVAL;

  private SimpleElementSelector publishElementSelector;

  /**
   * OSGi callback on component activation.
   *
   * @param context
   *          the component context
   */
  @Activate
  protected void activate(ComponentContext context) {
    BundleContext bundleContext = context.getBundleContext();

    serverUrl = StringUtils.trimToNull(bundleContext.getProperty(SERVER_URL_PROPERTY));
    if (serverUrl == null) {
      logger.warn("Server url was not set in '{}'", SERVER_URL_PROPERTY);
    } else {
      logger.info("Server url is {}", serverUrl);
    }
    systemUserName = bundleContext.getProperty(SecurityUtil.PROPERTY_KEY_SYS_USER);

    @SuppressWarnings("rawtypes")
    Dictionary properties = context.getProperties();
    if (!StringUtils.isBlank((String) properties.get(LIVE_STREAMING_URL))) {
      liveStreamingUrl = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAMING_URL));
      logger.info("Live streaming server url is {}", liveStreamingUrl);
    } else {
      logger.info("Live streaming url not set in '{}'. Streaming urls must be provided by capture agent properties.",
              LIVE_STREAMING_URL);
    }

    if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_NAME))) {
      streamName = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_NAME));
    } else {
      streamName = DEFAULT_STREAM_NAME;
    }

    if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_MIME_TYPE))) {
      streamMimeType = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_MIME_TYPE));
    } else {
      streamMimeType = DEFAULT_STREAM_MIME_TYPE;
    }

    String resolution = null;
    if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_RESOLUTION))) {
      resolution = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_RESOLUTION));
    } else {
      resolution = DEFAULT_STREAM_RESOLUTION;
    }
    streamResolution = resolution.split(",");

    String flavors = null;
    if (!StringUtils.isBlank((String) properties.get(LIVE_TARGET_FLAVORS))) {
      flavors = StringUtils.trimToEmpty((String) properties.get(LIVE_TARGET_FLAVORS));
    } else {
      flavors = DEFAULT_LIVE_TARGET_FLAVORS;
    }
    String[] flavorArray = StringUtils.split(flavors, ",");
    liveFlavors = new MediaPackageElementFlavor[flavorArray.length];
    int i = 0;
    for (String f : flavorArray) {
      liveFlavors[i++] = MediaPackageElementFlavor.parseFlavor(f);
    }

    publishedStreamingFormats = Arrays.asList(Optional.ofNullable(StringUtils.split(
            (String)properties.get(LIVE_PUBLISH_STREAMING), ",")).orElse(new String[0]));

    publishElementSelector = new SimpleElementSelector();
    for (MediaPackageElementFlavor flavor : publishFlavors) {
      publishElementSelector.addFlavor(flavor);
    }

    logger.info(
            "Configured live stream name: {}, mime type: {}, resolution: {}, target flavors: {}",
            streamName, streamMimeType, resolution, flavors);
  }

  @Override
  public boolean createOrUpdateLiveEvent(String mpId, DublinCoreCatalog episodeDC) throws LiveScheduleException {
    MediaPackage mp = getMediaPackageFromSearch(mpId);
    if (mp == null) {
      // Check if capture not over. We have to check because we may get a notification for past events if
      // the admin ui index is rebuilt
      DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(episodeDC.getFirst(DublinCore.PROPERTY_TEMPORAL));
      if (period.getEnd().getTime() <= System.currentTimeMillis()) {
        logger.info("Live media package {} not created in search index because event is already past (end date: {})",
                mpId, period.getEnd());
        return false;
      }
      return createLiveEvent(mpId, episodeDC);
    } else {
      // Check if the media package found in the search index is live. We have to check because we may get a
      // notification for past events if the admin ui index is rebuilt
      if (!mp.isLive()) {
        logger.info("Media package {} is in search index but not live so not updating it.", mpId);
        return false;
      }
      return updateLiveEvent(mp, episodeDC);
    }
  }

  @Override
  public boolean deleteLiveEvent(String mpId) throws LiveScheduleException {
    MediaPackage mp = getMediaPackageFromSearch(mpId);
    if (mp == null) {
      logger.debug("Live media package {} not found in search index", mpId);
      return false;
    } else {
      if (!mp.isLive()) {
        logger.info("Media package {} is not live. Not retracting.", mpId);
        return false;
      }
      return retractLiveEvent(mp);
    }
  }

  @Override
  public boolean updateLiveEventAcl(String mpId, AccessControlList acl) throws LiveScheduleException {
    MediaPackage previousMp = getMediaPackageFromSearch(mpId);
    if (previousMp != null) {
      if (!previousMp.isLive()) {
        logger.info("Media package {} is not live. Not updating acl.", mpId);
        return false;
      }
      // Replace and distribute acl, this creates new mp
      MediaPackage newMp = replaceAndDistributeAcl(previousMp, acl);
      // Publish mp to engage search index
      publishToSearch(newMp);
      // Don't leave garbage there!
      retractPreviousElements(previousMp, newMp);
      logger.info("Updated live acl for media package {}", newMp);
      return true;
    }
    return false;
  }

  boolean createLiveEvent(String mpId, DublinCoreCatalog episodeDC) throws LiveScheduleException {
    try {
      logger.info("Creating live media package {}", mpId);
      Snapshot snapshot = getSnapshotFromArchive(mpId);

      // generate live tracks
      MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
      setDurationForMediaPackage(tmpMp, episodeDC); // duration is used by live tracks
      Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);

      // publish to search
      MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
      for (Track t : tmpMp.getTracks()) {
        mpForSearch.add(t);
      }
      publishToSearch(mpForSearch);

      // add live publication to archive
      MediaPackage updatedArchivedMp = addLivePublicationToMediaPackage(snapshot, liveTracks);
      snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
      return true;
    } catch (Exception e) {
      throw new LiveScheduleException(e);
    }
  }

  boolean updateLiveEvent(MediaPackage mpFromSearch, DublinCoreCatalog episodeDC) throws LiveScheduleException {
    String mpId = mpFromSearch.getIdentifier().toString();
    Snapshot snapshot = getSnapshotFromArchive(mpId);

    // If the snapshot version is in our local cache, it means that this snapshot was created by us so
    // nothing to do. Note that this is just to save time; if the entry has already been deleted, the mp
    // will be compared below.
    if (snapshot.getVersion().equals(snapshotVersionCache.getIfPresent(mpId))) {
      logger.debug("Snapshot version {} was created by us so this change is ignored.", snapshot.getVersion());
      return false;
    }

    // create temp mp for comparison
    MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
    // remove all elements that would not be published
    Collection<MediaPackageElement> elements = publishElementSelector.select(tmpMp, false);
    Arrays.stream(tmpMp.getElements()).filter(Predicate.not(elements::contains)).collect(Collectors.toList())
            .forEach(tmpMp::remove);
    // generate new live tracks
    setDurationForMediaPackage(tmpMp, episodeDC); // duration is used by live tracks
    Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);

    // if nothing changed, no need to do anything
    if (isSameMediaPackage(mpFromSearch, tmpMp)) {
      logger.debug("Live media package {} seems to be the same. Not updating.", mpFromSearch);
      return false;
    }

    logger.info("Updating live media package {}", mpFromSearch);

    // update mp in search
    MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
    for (Track t : tmpMp.getTracks()) {
      mpForSearch.add(t);
    }
    removeLivePublicationChannel(mpForSearch); // we don't need the live publication in search
    publishToSearch(mpForSearch);
    retractPreviousElements(mpFromSearch, mpForSearch); // cleanup

    // update live publication in archive
    MediaPackage updatedArchivedMp = updateLivePublication(snapshot.getMediaPackage(), liveTracks);
    snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
    return true;
  }

  private void createOrUpdatePublicationTracks(Publication publication, Map<String, Track> generatedTracks) {
    if (publication.getTracks().length > 0) {
      publication.clearTracks();
    }

    for (String publishedStreamingFormat : publishedStreamingFormats) {
      Track track = generatedTracks.get(publishedStreamingFormat);
      if (track != null) {
        publication.addTrack(track);
      }
    }
  }

  private MediaPackage updateLivePublication(MediaPackage mediaPackage, Map<String, Track> generatedTracks) {
    Publication[] publications = mediaPackage.getPublications();
    for (Publication publication : publications) {
      if (publication.getChannel().equals(CHANNEL_ID)) {
        createOrUpdatePublicationTracks(publication, generatedTracks);
      }
    }
    return mediaPackage;
  }

  boolean retractLiveEvent(MediaPackage mp) throws LiveScheduleException {
    retract(mp);

    // Get latest mp from the asset manager if there to remove the publication
    try {
      String mpId = mp.getIdentifier().toString();
      Snapshot snapshot = getSnapshotFromArchive(mpId);
      MediaPackage archivedMp = snapshot.getMediaPackage();
      removeLivePublicationChannel(archivedMp);
      logger.debug("Removed live pub channel from archived media package {}", mp);
      // Take a snapshot with the publication removed and put its version in our local cache
      // so that we ignore notifications for this snapshot version.
      snapshotVersionCache.put(mpId, assetManager.takeSnapshot(archivedMp).getVersion());
    } catch (LiveScheduleException e) {
      // It was not found in asset manager. This is ok.
    }
    return true;
  }

  void publishToSearch(MediaPackage mp) throws LiveScheduleException {
    try {
      // Add media package to the search index
      logger.info("Publishing LIVE media package {} to search index", mp);
      Job publishJob = searchService.add(mp);
      if (!waitForStatus(publishJob).isSuccess()) {
        throw new LiveScheduleException("Live media package " + mp.getIdentifier() + " could not be published");
      }
    } catch (LiveScheduleException e) {
      throw e;
    } catch (Exception e) {
      throw new LiveScheduleException(e);
    }
  }

  void retract(MediaPackage mp) throws LiveScheduleException {
    Organization org = securityService.getOrganization();
    User prevUser = org != null ? securityService.getUser() : null;
    try {
      securityService.setUser(SecurityUtil.createSystemUser(systemUserName, org));
      Set<String> elementIds = new HashSet<String>();
      String mpId = mp.getIdentifier().toString();
      logger.info("Removing LIVE media package {} from the search index", mpId);

      for (MediaPackageElement mpe : mp.getElements()) {
        if (!MediaPackageElement.Type.Publication.equals(mpe.getElementType())) {
          elementIds.add(mpe.getIdentifier());
        }
      }

      List<String> failedJobs = new ArrayList<>();
      // Remove media package from the search index
      Job searchDeleteJob = searchService.delete(mpId);
      if (!waitForStatus(searchDeleteJob).isSuccess()) {
        failedJobs.add("Search Index");
      }

      // Removing media from the download distribution service
      Job distributionRetractJob =  downloadDistributionService.retract(CHANNEL_ID, mp, elementIds);
      if (!waitForStatus(distributionRetractJob).isSuccess()) {
        failedJobs.add("Distribution");
      }

      if (!failedJobs.isEmpty()) {
        throw new LiveScheduleException(
            String.format("Removing live media package %s from %s failed", mpId, String.join(" and ", failedJobs)));
      }
    } catch (LiveScheduleException e) {
      throw e;
    } catch (Exception e) {
      throw new LiveScheduleException(e);
    } finally {
      securityService.setUser(prevUser);
    }
  }

  /**
   * Retrieves the media package from the search index.
   *
   * @param mediaPackageId
   *          the media package id
   * @return the media package in the search index or null if not there
   * @throws LiveScheduleException
   *           if found many media packages with the same id
   */
  MediaPackage getMediaPackageFromSearch(String mediaPackageId) throws LiveScheduleException {
    // Issue #2504: make sure the search index is read by admin so that the media package is always found.
    Organization org = securityService.getOrganization();
    User prevUser = org != null ? securityService.getUser() : null;
    securityService.setUser(SecurityUtil.createSystemUser(systemUserName, org));
    try {
      // Look for the media package in the search index
      return searchService.get(mediaPackageId);
    } catch (UnauthorizedException e) {
      logger.warn("Unexpected unauthorized exception when querying the search index for mp {}", mediaPackageId, e);
      return null;
    } catch (NotFoundException e) {
      return null;
    } finally {
      securityService.setUser(prevUser);
    }
  }

  void setDurationForMediaPackage(MediaPackage mp, DublinCoreCatalog dc) {
    DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(dc.getFirst(DublinCore.PROPERTY_TEMPORAL));
    long duration = period.getEnd().getTime() - period.getStart().getTime();
    mp.setDuration(duration);
    logger.debug("Live media package {} has start {} and duration {}", mp.getIdentifier(), mp.getDate(),
            mp.getDuration());
  }

  Map<String, Track> addLiveTracksToMediaPackage(MediaPackage mp, DublinCoreCatalog episodeDC)
          throws LiveScheduleException {
    String caName = episodeDC.getFirst(DublinCore.PROPERTY_SPATIAL);
    HashMap<String, Track> generatedTracks = new HashMap<>();
    String mpId = mp.getIdentifier().toString();
    try {
      // If capture agent registered the properties:
      // capture.device.live.resolution.WIDTHxHEIGHT=COMPLETE_STREAMING_URL, use them!
      try {
        Properties caProps = captureAgentService.getAgentCapabilities(caName);
        if (caProps != null) {
          Enumeration<Object> en = caProps.keys();
          while (en.hasMoreElements()) {
            String key = (String) en.nextElement();
            if (key.startsWith(CA_PROPERTY_RESOLUTION_URL_PREFIX)) {
              String resolution = key.substring(CA_PROPERTY_RESOLUTION_URL_PREFIX.length());
              String url = caProps.getProperty(key);
              // Note: only one flavor is supported in this format (the default: presenter/delivery)
              MediaPackageElementFlavor flavor = MediaPackageElementFlavor.parseFlavor(DEFAULT_LIVE_TARGET_FLAVORS);
              String replacedUrl = replaceVariables(mpId, caName, url, flavor, resolution);
              mp.add(buildStreamingTrack(replacedUrl, flavor, streamMimeType, resolution, mp.getDuration()));
            }
          }
        }
      } catch (NotFoundException e) {
        // Capture agent not found so we can't get its properties. Assume the service configuration should
        // be used instead. Note that we can't schedule anything on a CA that has not registered so this is
        // unlikely to happen.
      }

      // Capture agent did not pass any CA_PROPERTY_RESOLUTION_URL_PREFIX property when registering
      // so use the service configuration
      if (mp.getTracks().length == 0) {
        if (liveStreamingUrl == null) {
          throw new LiveScheduleException(
                  "Cannot build live tracks because '" + LIVE_STREAMING_URL + "' configuration was not set.");
        }

        for (MediaPackageElementFlavor flavor : liveFlavors) {
          for (int i = 0; i < streamResolution.length; i++) {
            String uri = replaceVariables(mpId, caName, UrlSupport.concat(liveStreamingUrl.toString(), streamName),
                    flavor, streamResolution[i]);
            Track track = buildStreamingTrack(uri, flavor, streamMimeType, streamResolution[i], mp.getDuration());
            mp.add(track);
            generatedTracks.put(flavor + ":" + streamResolution[i], track);
          }
        }
      }
    } catch (URISyntaxException e) {
      throw new LiveScheduleException(e);
    }
    return generatedTracks;
  }

  Track buildStreamingTrack(String uriString, MediaPackageElementFlavor flavor, String mimeType, String resolution,
          long duration) throws URISyntaxException {

    URI uri = new URI(uriString);

    MediaPackageElementBuilder elementBuilder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
    MediaPackageElement element = elementBuilder.elementFromURI(uri, MediaPackageElement.Type.Track, flavor);
    TrackImpl track = (TrackImpl) element;

    // Set duration and mime type
    track.setDuration(duration);
    track.setLive(true);
    track.setMimeType(MimeTypes.parseMimeType(mimeType));

    VideoStreamImpl video = new VideoStreamImpl("video-" + flavor.getType() + "-" + flavor.getSubtype());
    // Set video resolution
    String[] dimensions = resolution.split("x");
    video.setFrameWidth(Integer.parseInt(dimensions[0]));
    video.setFrameHeight(Integer.parseInt(dimensions[1]));

    track.addStream(video);

    logger.debug("Creating live track element of flavor {}, resolution {}, and url {}",
            new Object[] { flavor, resolution, uriString });

    return track;
  }

  /**
   * Replaces variables in the live stream name. Currently, this is only prepared to handle the following: #{id} = media
   * package id, #{flavor} = type-subtype of flavor, #{caName} = capture agent name, #{resolution} = stream resolution
   */
  String replaceVariables(String mpId, String caName, String toBeReplaced, MediaPackageElementFlavor flavor,
          String resolution) {

    // Substitution pattern: any string in the form #{name}, where 'name' has only word characters: [a-zA-Z_0-9].
    final Pattern pat = Pattern.compile("#\\{(\\w+)\\}");

    Matcher matcher = pat.matcher(toBeReplaced);
    StringBuffer sb = new StringBuffer();
    while (matcher.find()) {
      if (matcher.group(1).equals(REPLACE_ID)) {
        matcher.appendReplacement(sb, mpId);
      } else if (matcher.group(1).equals(REPLACE_FLAVOR)) {
        matcher.appendReplacement(sb, flavor.getType() + "-" + flavor.getSubtype());
      } else if (matcher.group(1).equals(REPLACE_CA_NAME)) {
        // Taking the easy route to find the capture agent name...
        matcher.appendReplacement(sb, caName);
      } else if (matcher.group(1).equals(REPLACE_RESOLUTION)) {
        // Taking the easy route to find the capture agent name...
        matcher.appendReplacement(sb, resolution);
      } // else will not replace
    }
    matcher.appendTail(sb);
    return sb.toString();
  }

  private JobBarrier.Result waitForStatus(Job... jobs) throws IllegalStateException, IllegalArgumentException {
    if (serviceRegistry == null) {
      throw new IllegalStateException("Can't wait for job status without providing a service registry first");
    }
    JobBarrier barrier = new JobBarrier(null, serviceRegistry, jobPollingInterval, jobs);
    return barrier.waitForJobs();
  }

  Snapshot getSnapshotFromArchive(String mpId) throws LiveScheduleException {
    Optional<Snapshot> snapshot = assetManager.getLatestSnapshot(mpId);
    if (snapshot.isEmpty()) {
      // No snapshot?
      throw new LiveScheduleException(String.format("Unexpected error: media package %s has not been archived.", mpId));
    }
    return snapshot.get();
  }

  MediaPackage distributeAclsAndCatalogs(Snapshot snapshot) throws LiveScheduleException {
    try {
      MediaPackage mp = (MediaPackage) snapshot.getMediaPackage().clone();

      // Select elements
      Collection<MediaPackageElement> elements = publishElementSelector.select(mp, false);
      Set<String> elementIds = elements.stream().map(MediaPackageElement::getIdentifier).collect(Collectors.toSet());

      // Distribute elements
      Job distributionJob = downloadDistributionService.distribute(CHANNEL_ID, mp, elementIds, false);
      if (!waitForStatus(distributionJob).isSuccess()) {
        throw new LiveScheduleException(
                "Element(s) for live media package " + mp.getIdentifier() + " could not be distributed");
      }

      // Remove all elements from mp
      for (MediaPackageElement e: mp.getElements()) {
        mp.remove(e);
      }

      // Re-add distributed elements
      List<MediaPackageElement> distributedElements = (List<MediaPackageElement>) MediaPackageElementParser
              .getArrayFromXml(distributionJob.getPayload());
      for (MediaPackageElement distributedElement : distributedElements) {
        mp.add(distributedElement);
      }

      // Clean up
      for (String id : elementIds) {
        MediaPackageElement e = mp.getElementById(id);
        workspace.delete(e.getURI());
      }

      return mp;
    } catch (LiveScheduleException e) {
      throw e;
    } catch (Exception e) {
      throw new LiveScheduleException(e);
    }
  }

  MediaPackage replaceAndDistributeAcl(MediaPackage previousMp, AccessControlList acl) throws LiveScheduleException {
    try {
      // This is the mp from the search index
      MediaPackage mp = (MediaPackage) previousMp.clone();

      // Remove previous Acl from the mp
      Attachment[] atts = mp.getAttachments(MediaPackageElements.XACML_POLICY_EPISODE);
      if (atts.length > 0) {
        mp.remove(atts[0]);
      }

      // Attach current ACL to mp, acl will be created in the ws/wfr
      authService.setAcl(mp, AclScope.Episode, acl);
      atts = mp.getAttachments(MediaPackageElements.XACML_POLICY_EPISODE);
      if (atts.length > 0) {
        String aclId = atts[0].getIdentifier();
        // Distribute new acl
        Job distributionJob = downloadDistributionService.distribute(CHANNEL_ID, mp, aclId, false);
        if (!waitForStatus(distributionJob).isSuccess()) {
          throw new LiveScheduleException(
                  "Acl for live media package " + mp.getIdentifier() + " could not be distributed");
        }

        MediaPackageElement e = mp.getElementById(aclId);
        // Cleanup workspace/wfr
        mp.remove(e);
        workspace.delete(e.getURI());

        // Add distributed acl to mp
        mp.add(MediaPackageElementParser.getFromXml(distributionJob.getPayload()));
      }
      return mp;
    } catch (LiveScheduleException e) {
      throw e;
    } catch (Exception e) {
      throw new LiveScheduleException(e);
    }
  }

  MediaPackage addLivePublicationToMediaPackage(Snapshot snapshot, Map<String, Track> generatedTracks)
          throws LiveScheduleException {
    MediaPackage mp = snapshot.getMediaPackage();

    Organization currentOrg = null;
    try {
      currentOrg = organizationService.getOrganization(snapshot.getOrganizationId());
    } catch (NotFoundException e) {
      logger.warn("Organization in snapshot not found: {}", snapshot.getOrganizationId());
    }

    logger.debug("Adding live channel publication element to media package {}", mp);
    String engageUrlString = null;
    if (currentOrg != null) {
      engageUrlString = StringUtils.trimToNull(currentOrg.getProperties().get(ENGAGE_URL_PROPERTY));
    }
    if (engageUrlString == null) {
      engageUrlString = serverUrl;
      logger.info(
          "Using 'server.url' as a fallback for the non-existing organization level key '{}' for the publication url",
          ENGAGE_URL_PROPERTY);
    }

    try {
      // Create new distribution element
      URI engageUri = URIUtils.resolve(new URI(engageUrlString), PLAYER_PATH + mp.getIdentifier().toString());
      Publication publicationElement = PublicationImpl.publication(UUID.randomUUID().toString(), CHANNEL_ID, engageUri,
              MimeTypes.parseMimeType("text/html"));
      mp.add(publicationElement);
      createOrUpdatePublicationTracks(publicationElement, generatedTracks);
      return mp;
    } catch (URISyntaxException e) {
      throw new LiveScheduleException(e);
    }
  }

  void removeLivePublicationChannel(MediaPackage mp) {
    // Remove publication element
    Publication[] publications = mp.getPublications();
    if (publications != null) {
      for (Publication publication : publications) {
        if (CHANNEL_ID.equals(publication.getChannel())) {
          mp.remove(publication);
        }
      }
    }
  }

  boolean isSameMediaPackage(MediaPackage previous, MediaPackage current) {

    Equator<Track> liveTrackEquator = new Equator<>() {
      @Override
      public boolean equate(Track track1, Track track2) {
        // we can safely assume that each live track has exactly one video stream since we generated that ourselves
        VideoStream videostream1 = (VideoStream) track1.getStreams()[0];
        VideoStream videostream2 = (VideoStream) track2.getStreams()[0];

        return Objects.equals(track1.getURI(), track2.getURI())
                && Objects.equals(track1.getFlavor(), track2.getFlavor())
                && Objects.equals(track1.getMimeType(), track2.getMimeType())
                && Objects.equals(track1.getDuration(), track2.getDuration())
                && Objects.equals(videostream1.getFrameWidth(), videostream2.getFrameWidth())
                && Objects.equals(videostream1.getFrameHeight(), videostream2.getFrameHeight());
      }

      @Override
      public int hash(Track track) {
        VideoStream videostream = (VideoStream) track.getStreams()[0];
        return Objects.hash(track.getURI(), track.getFlavor(), track.getMimeType(), track.getDuration(),
                videostream.getFrameWidth(), videostream.getFrameHeight());
      }
    };

    Equator<MediaPackageElement> catalogAndAttachmentEquator = new Equator<>() {
      @Override
      public boolean equate(MediaPackageElement mpe1, MediaPackageElement mpe2) {
        return Objects.equals(mpe1.getIdentifier(), mpe2.getIdentifier())
                && Objects.equals(mpe1.getElementType(), mpe2.getElementType())
                && Objects.equals(mpe1.getChecksum(), mpe2.getChecksum())
                && Objects.equals(mpe1.getFlavor(), mpe2.getFlavor());
      }

      @Override
      public int hash(MediaPackageElement mpe) {
        return Objects.hash(mpe.getIdentifier(), mpe.getElementType(), mpe.getChecksum(), mpe.getFlavor());
      }
    };

    if (!CollectionUtils.isEqualCollection(Arrays.asList(previous.getTracks()), Arrays.asList(current.getTracks()),
            liveTrackEquator)) {
      return false;
    } else if (!CollectionUtils.isEqualCollection(Arrays.asList(previous.getCatalogs()),
            Arrays.asList(current.getCatalogs()), catalogAndAttachmentEquator)) {
      return false;
    } else {
      return CollectionUtils.isEqualCollection(Arrays.asList(previous.getAttachments()),
              Arrays.asList(current.getAttachments()), catalogAndAttachmentEquator);
    }
  }

  void retractPreviousElements(MediaPackage previousMp, MediaPackage newMp) throws LiveScheduleException {
    try {
      // Now can retract elements from previous publish. Before creating a retraction
      // job, check if the element url is still used by the new media package.
      Set<String> elementIds = new HashSet<String>();
      for (MediaPackageElement element : previousMp.getElements()) {
        // We don't retract tracks because they are just live links
        if (!Track.TYPE.equals(element.getElementType())) {
          boolean canBeDeleted = true;
          for (MediaPackageElement newElement : newMp.getElements()) {
            if (element.getURI().equals(newElement.getURI())) {
              logger.debug(
                  "Not retracting element {} with URI {} from download distribution because it is "
                      + "still used by updated live media package",
                  element.getIdentifier(), element.getURI());
              canBeDeleted = false;
              break;
            }
          }
          if (canBeDeleted) {
            elementIds.add(element.getIdentifier());
          }
        }
      }
      if (elementIds.size() > 0) {
        Job job = downloadDistributionService.retract(CHANNEL_ID, previousMp, elementIds);
        // Wait for retraction to finish
        if (!waitForStatus(job).isSuccess()) {
          logger.warn("One of the download retract jobs did not complete successfully");
        } else {
          logger.debug("Retraction of previously published elements complete");
        }
      }
    } catch (DistributionException e) {
      throw new LiveScheduleException(e);
    }
  }

  @Reference
  public void setDublinCoreService(DublinCoreCatalogService service) {
    this.dublinCoreService = service;
  }

  @Reference
  public void setSearchService(SearchService service) {
    this.searchService = service;
  }

  @Reference
  public void setSeriesService(SeriesService service) {
    this.seriesService = service;
  }

  @Reference
  public void setServiceRegistry(ServiceRegistry service) {
    this.serviceRegistry = service;
  }

  @Reference
  public void setCaptureAgentService(CaptureAgentStateService service) {
    this.captureAgentService = service;
  }

  @Reference(
      name = "DownloadDistributionService",
      target = "(distribution.channel=download)"
  )
  public void setDownloadDistributionService(DownloadDistributionService service) {
    this.downloadDistributionService = service;
    logger.info("Distribution service with type '{}' set.", downloadDistributionService.getDistributionType());
  }

  @Reference
  public void setWorkspace(Workspace ws) {
    this.workspace = ws;
  }

  @Reference
  public void setAssetManager(AssetManager assetManager) {
    this.assetManager = assetManager;
  }

  @Reference
  public void setAuthorizationService(AuthorizationService service) {
    this.authService = service;
  }

  @Reference
  public void setOrganizationService(OrganizationDirectoryService service) {
    this.organizationService = service;
  }

  @Reference
  public void setSecurityService(SecurityService service) {
    this.securityService = service;
  }
  // === Set by OSGI - end

  // === Used by unit tests - begin
  void setJobPollingInterval(long jobPollingInterval) {
    this.jobPollingInterval = jobPollingInterval;
  }

  Cache<String, Version> getSnapshotVersionCache() {
    return this.snapshotVersionCache;
  }
  // === Used by unit tests - end
}