WowzaStreamingDistributionService.java

/*
 * Licensed to The Apereo Foundation under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional
 * information regarding copyright ownership.
 *
 *
 * The Apereo Foundation licenses this file to you under the Educational
 * Community License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License
 * at:
 *
 *   http://opensource.org/licenses/ecl2.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 *
 */
package org.opencastproject.distribution.streaming.wowza;

import static java.lang.String.format;
import static org.opencastproject.util.RequireUtil.notNull;

import org.opencastproject.distribution.api.AbstractDistributionService;
import org.opencastproject.distribution.api.DistributionException;
import org.opencastproject.distribution.api.DistributionService;
import org.opencastproject.distribution.api.StreamingDistributionService;
import org.opencastproject.job.api.Job;
import org.opencastproject.mediapackage.AudioStream;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElement;
import org.opencastproject.mediapackage.MediaPackageElementParser;
import org.opencastproject.mediapackage.MediaPackageParser;
import org.opencastproject.mediapackage.VideoStream;
import org.opencastproject.mediapackage.track.TrackImpl;
import org.opencastproject.mediapackage.track.TrackImpl.StreamingProtocol;
import org.opencastproject.security.api.Organization;
import org.opencastproject.security.api.OrganizationDirectoryService;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.UserDirectoryService;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.serviceregistry.api.ServiceRegistryException;
import org.opencastproject.util.FileSupport;
import org.opencastproject.util.LoadUtil;
import org.opencastproject.util.MimeType;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.util.RequireUtil;
import org.opencastproject.util.UrlSupport;
import org.opencastproject.util.XmlSafeParser;
import org.opencastproject.workspace.api.Workspace;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.osgi.framework.BundleContext;
import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.component.ComponentException;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.DOMException;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;

import javax.ws.rs.core.UriBuilder;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;

/**
 * Distributes media to the local media delivery directory.
 */
@Component(
    immediate = true,
    service = { DistributionService.class, StreamingDistributionService.class },
    property = {
        "service.description=Distribution Service (Streaming)",
        "distribution.channel=streaming"
    }
)
public class WowzaStreamingDistributionService extends AbstractDistributionService
        implements StreamingDistributionService {

  /** The key in the properties file that defines the streaming directory. */
  protected static final String STREAMING_DIRECTORY_KEY = "org.opencastproject.streaming.directory";

  /** The key in the properties file that defines the streaming formats to distribute. */
  protected static final String WOWZA_FORMATS_KEY = "org.opencastproject.wowza.formats";

  /** The tenant-specific key in the properties file that defines the wowza url. */
  protected static final String WOWZA_URL_KEY = "org.opencastproject.%s.wowza.url";

  /** The tenant specific key in the properties file that defines the wowza port. */
  protected static final String WOWZA_PORT_KEY = "org.opencastproject.%s.wowza.port";

  protected Map<String, URI> streamingUrls;

  /** The key in the properties file that specifies in which order the videos in the SMIL file should be stored */
  protected static final String SMIL_ORDER_KEY = "org.opencastproject.wowza.smil.order";

  /** One of the possible values for the order of the videos in the SMIL file */
  private static final String SMIL_ASCENDING_VALUE = "ascending";

  /** One of the possible values for the order of the videos in the SMIL file */
  private static final String SMIL_DESCENDING_VALUE = "descending";

  /** The attribute "video-bitrate" in the SMIL files */
  private static final String SMIL_ATTR_VIDEO_BITRATE = "video-bitrate";

  /** The attribute "video-width" in the SMIL files */
  private static final String SMIL_ATTR_VIDEO_WIDTH = "width";

  /** The attribute "video-height" in the SMIL files */
  private static final String SMIL_ATTR_VIDEO_HEIGHT = "height";

  /** The attribute to return for Distribution Type */
  private static final String DISTRIBUTION_TYPE = "streaming";

  /** Acceptable values for the streaming schemes */
  private static final Set<String> validSchemes;
  private static final Map<String, Integer> defaultProtocolPorts;

  static {
    Set<String> temp = new HashSet<>();
    temp.add("http");
    temp.add("https");
    validSchemes = Collections.unmodifiableSet(temp);

    Map<String, Integer> tempMap = new HashMap<>();
    tempMap.put("http", 80);
    tempMap.put("https", 443);
    defaultProtocolPorts = Collections.unmodifiableMap(tempMap);
  }

  /** Default scheme */
  protected static final String DEFAULT_SCHEME = "http";

  /** Logging facility */
  private static final Logger logger = LoggerFactory.getLogger(WowzaStreamingDistributionService.class);

  /** Receipt type */
  public static final String JOB_TYPE = "org.opencastproject.distribution.streaming";

  /** List of available operations on jobs */
  private enum Operation {
    Distribute, Retract
  };

  /** The load on the system introduced by creating a distribute job */
  public static final float DEFAULT_DISTRIBUTE_JOB_LOAD = 0.1f;

  /** The load on the system introduced by creating a retract job */
  public static final float DEFAULT_RETRACT_JOB_LOAD = 0.1f;

  /** The key to look for in the service configuration file to override the {@link #DEFAULT_DISTRIBUTE_JOB_LOAD} */
  public static final String DISTRIBUTE_JOB_LOAD_KEY = "job.load.streaming.distribute";

  /** The key to look for in the service configuration file to override the {@link #DEFAULT_RETRACT_JOB_LOAD} */
  public static final String RETRACT_JOB_LOAD_KEY = "job.load.streaming.retract";

  /** The load on the system introduced by creating a distribute job */
  private float distributeJobLoad = DEFAULT_DISTRIBUTE_JOB_LOAD;

  /** The load on the system introduced by creating a retract job */
  private float retractJobLoad = DEFAULT_RETRACT_JOB_LOAD;

  /** The distribution directory */
  private File distributionDirectory = null;

  /** The set of supported streaming formats to distribute. */
  private Set<StreamingProtocol> supportedFormats;

  /** Whether or not the video order in the SMIL files is descending */
  private boolean isSmilOrderDescending = false;

  private static final Gson gson = new Gson();

  /**
   * Creates a new instance of the streaming distribution service.
   */
  public WowzaStreamingDistributionService() {
    super(JOB_TYPE);
  }

  public String getDistributionType() {
    return DISTRIBUTION_TYPE;
  }

  @Activate
  public void activate(BundleContext bundleContext, Map<String, Object> properties)
          throws ComponentException, ConfigurationException {
    modified(bundleContext, properties);
  }

  @Modified
  public void modified(BundleContext bundleContext, Map<String, Object> properties)
          throws ComponentException, ConfigurationException {

    // get configuration
    if (properties != null && bundleContext != null) {

      Map streamingUrlConfiguration = new ConcurrentHashMap<>();

      // Streaming directory
      String distributionDirectoryPath = StringUtils.trimToNull((String) properties.get(STREAMING_DIRECTORY_KEY));

      if (distributionDirectoryPath == null) {
        // set default streaming directory to ${org.opencastproject.storage.dir}/streams
        distributionDirectoryPath
            = StringUtils.trimToNull(bundleContext.getProperty("org.opencastproject.storage.dir"));
        if (distributionDirectoryPath != null) {
          distributionDirectoryPath += "/streams";
        }
      }
      if (distributionDirectoryPath == null) {
        throw new ComponentException("Streaming distribution directory must be set");
      }

      distributionDirectory = new File(distributionDirectoryPath);
      if (!distributionDirectory.isDirectory()) {
        try {
          Files.createDirectories(distributionDirectory.toPath());
        } catch (IOException e) {
          throw new ComponentException("Distribution directory " + distributionDirectory
              + " does not exist and can't be created", e);
        }
      }

      logger.info("Streaming distribution directory is {}", distributionDirectory);

      // Streaming URLs
      List<Organization> organizations = organizationDirectoryService.getOrganizations();
      for (Organization org: organizations) {
        String orgId = org.getId();
        String streamingUrl = StringUtils.trimToNull((String) properties.get(String.format(WOWZA_URL_KEY, orgId)));
        String streamingPort = StringUtils.trimToNull((String) properties.get(String.format(WOWZA_PORT_KEY, orgId)));

        if (streamingUrl != null) {
          try {
            URI tenantStreamingUrl = getStreamingUrl(streamingUrl, streamingPort, validSchemes, DEFAULT_SCHEME, null);

            if (tenantStreamingUrl == null) {
              throw new ComponentException(String.format("Streaming URL is undefined for tenant %s.", orgId));
            }

            streamingUrlConfiguration.put(orgId, tenantStreamingUrl);
            logger.info("Wowza Streaming URL for tenant {} set to \"{}\"", orgId, tenantStreamingUrl);
          } catch (URISyntaxException e) {
            throw new ComponentException(
                    String.format("Wowza Streaming URL %s of tenant %s could not be parsed", streamingUrl, orgId), e);
          }
        } else {
          logger.debug("Wowza Streaming URL is undefined for tenant {}", orgId);
        }
      }

      streamingUrls = streamingUrlConfiguration;

      // Streaming formats
      String formats = StringUtils.trimToNull((String) properties.get(WOWZA_FORMATS_KEY));

      if (formats == null) {
        setDefaultSupportedFormats();
      } else {
        setSupportedFormats(formats);
      }
      logger.info("The supported streaming formats are: {}", StringUtils.join(supportedFormats, ","));

      // Smil order
      String smilOrder = StringUtils.trimToNull((String) properties.get(SMIL_ORDER_KEY));

      if (smilOrder == null || SMIL_ASCENDING_VALUE.equals(smilOrder)) {
        logger.info("The videos in the SMIL files will be sorted in ascending bitrate order");
        isSmilOrderDescending = false;
      } else if (SMIL_DESCENDING_VALUE.equals(smilOrder)) {
        isSmilOrderDescending = true;
        logger.info("The videos in the SMIL files will be sorted in descending bitrate order");
      } else {
        throw new ConfigurationException(SMIL_ORDER_KEY, format("Illegal value '%s'. Valid options are '%s' and '%s'",
                smilOrder, SMIL_ASCENDING_VALUE, SMIL_DESCENDING_VALUE));
      }

      // Job loads
      distributeJobLoad = LoadUtil.getConfiguredLoadValue(properties, DISTRIBUTE_JOB_LOAD_KEY,
              DEFAULT_DISTRIBUTE_JOB_LOAD, serviceRegistry);
      retractJobLoad = LoadUtil.getConfiguredLoadValue(properties, RETRACT_JOB_LOAD_KEY, DEFAULT_RETRACT_JOB_LOAD,
              serviceRegistry);
    }
  }

  public boolean publishToStreaming() {
    String currentOrgId = securityService.getOrganization().getId();
    return streamingUrls.containsKey(currentOrgId);
  }

  private URI getStreamingURLforCurrentOrg() {
    String currentOrgId = securityService.getOrganization().getId();
    if (streamingUrls.containsKey(currentOrgId)) {
      return streamingUrls.get(currentOrgId);
    }
    return null;
  }

  /**
   * Transform the configuration value into the supported formats to distribute to the Wowza server.
   *
   * @param formatString
   *          The string to parse with the supported formats.
   */
  private void setSupportedFormats(String formatString) {
    supportedFormats = new TreeSet<>();

    for (String format : formatString.toUpperCase().split("[\\s,]")) {
      if (!format.isEmpty()) {
        try {
          StreamingProtocol protocol = StreamingProtocol.valueOf(format);
          supportedFormats.add(protocol);
        } catch (IllegalArgumentException e) {
          logger.warn("Found incorrect format \"{}\". Ignoring...", format);
        }
      }
    }
  }

  /**
   * Get the default set of supported formats to distribute to Wowza.
   */
  private void setDefaultSupportedFormats() {
    supportedFormats = new TreeSet<>(Arrays.asList(
            TrackImpl.StreamingProtocol.HLS,
            TrackImpl.StreamingProtocol.HDS,
            TrackImpl.StreamingProtocol.SMOOTH,
            TrackImpl.StreamingProtocol.DASH));
  }

  /**
   * Calculate a streaming URL based on input parameters
   *
   * @throws URISyntaxException
   */
  private static URI getStreamingUrl(String inputUri, String inputPort, Set<String> validSchemes, String defaultScheme,
          String defaultUri) throws URISyntaxException {

    Integer port;
    try {
      port = Integer.parseInt(StringUtils.trimToEmpty(inputPort));
    } catch (NumberFormatException e) {
      port = null;
    }

    URI uri;
    if (StringUtils.isNotBlank(inputUri)) {
      uri = new URI(inputUri);
    } else if (StringUtils.isNotBlank(defaultUri)) {
      uri = new URI(defaultUri);
    } else {
      throw new IllegalArgumentException("Provided streaming URL is empty.");
    }
    UriBuilder uriBuilder = UriBuilder.fromUri(uri);
    String scheme = uri.getScheme();
    String uriPath = uri.getPath();
    // When a URI does not have a scheme, Java parses it as if all the URI was a (relative) path
    // However, we will assume that a host was always provided, so everything before the first "/" is the host,
    // not part of the path
    if (uri.getHost() == null) {
      uriBuilder.host(uriPath.substring(0, uriPath.indexOf("/"))).replacePath(uriPath.substring(uriPath.indexOf("/")));
    }

    if (!validSchemes.contains(scheme)) {
      if (scheme == null) {
        uriBuilder.scheme(defaultScheme);
      } else {
        throw new URISyntaxException(inputUri, "Provided URI has an illegal scheme");
      }
    }

    if ((port != null) && (!port.equals(defaultProtocolPorts.get(uriBuilder.build().getScheme())))) {
      uriBuilder.port(port);
    }

    return uriBuilder.build();
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.distribution.api.StreamingDistributionService#distribute(java.lang.String,
   * org.opencastproject.mediapackage.MediaPackage, java.util.Set)
   */
  @Override
  public Job distribute(String channelId, MediaPackage mediapackage, Set<String> elementIds)
          throws DistributionException {

    notNull(mediapackage, "mediaPackage");
    notNull(elementIds, "elementIds");
    notNull(channelId, "channelId");

    if (getStreamingURLforCurrentOrg() == null) {
      throw new IllegalStateException(
              String.format("No streaming url or port set for tenant %s", securityService.getOrganization().getId()));
    }
    if (distributionDirectory == null) {
      throw new IllegalStateException(
              "Streaming distribution directory must be set (org.opencastproject.streaming.directory)");
    }

    try {
      return serviceRegistry.createJob(
              JOB_TYPE,
              Operation.Distribute.toString(),
              Arrays.asList(channelId, MediaPackageParser.getAsXml(mediapackage), gson.toJson(elementIds)),
              distributeJobLoad);
    } catch (ServiceRegistryException e) {
      throw new DistributionException("Unable to create a job", e);
    }
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.distribution.api.DistributionService#distribute(String,
   *      org.opencastproject.mediapackage.MediaPackage, String)
   */
  @Override
  public Job distribute(final String channelId, final MediaPackage mediapackage, final String elementId)
          throws DistributionException {
    return distribute(channelId, mediapackage, new HashSet<>(Collections.singletonList(elementId)));
  }

  /**
   * Distribute media package elements to the download distribution service.
   *
   * @param channelId The id of the publication channel to be distributed to.
   * @param mediaPackage The media package that contains the elements to be distributed.
   * @param elementIds The ids of the elements that should be distributed
   * contained within the media package.
   * @return A reference to the MediaPackageElements that have been distributed.
   * @throws DistributionException Thrown if the parent directory of the
   * MediaPackageElement cannot be created, if the MediaPackageElement cannot be
   * copied or another unexpected exception occurs.
   */
  private List<MediaPackageElement> distributeElements(final String channelId, final MediaPackage mediaPackage,
          final Set<String> elementIds, URI streamingURL) throws DistributionException {
    notNull(mediaPackage, "mediaPackage");
    notNull(elementIds, "elementIds");
    notNull(channelId, "channelId");

    List<MediaPackageElement> distributedElements = new ArrayList<>();
    for (MediaPackageElement element : getElements(mediaPackage, elementIds)) {
      distributedElements.addAll(distributeElement(channelId, mediaPackage, element, streamingURL));
    }
    return distributedElements;
  }

  /**
   * Distribute a media package element to the download distribution service.
   *
   * @param mediaPackage
   *          The media package that contains the element to distribute.
   * @param element
   *          The element to be distributed
   * @return A list of elements that have been distributed
   * @throws DistributionException
   *           Thrown if the parent directory of the MediaPackageElement cannot be created, if the MediaPackageElement
   *           cannot be copied or another unexpected exception occurs.
   */
  private synchronized List<MediaPackageElement> distributeElement(final String channelId,
          final MediaPackage mediaPackage, final MediaPackageElement element, URI streamingURL)
          throws DistributionException {

    if (supportedFormats.isEmpty()) {
      logger.warn("Skipping distribution of element \"{}\" because no streaming format was specified", element);
      return Collections.emptyList();
    }

    // Streaming servers only deal with tracks
    if (!MediaPackageElement.Type.Track.equals(element.getElementType())) {
      logger.debug("Skipping {} {} for distribution to the streaming server",
              element.getElementType(), element.getIdentifier());
      return Collections.emptyList();
    }

    try {
      File source;
      try {
        source = workspace.get(element.getURI());
      } catch (NotFoundException | IOException e) {
        throw new DistributionException("Error getting element " + element.getURI() + " from the workspace", e);
      }

      ArrayList<MediaPackageElement> distribution = new ArrayList<>();

      // Put the file in place

      File destination = getDistributionFile(channelId, mediaPackage, element, streamingURL);
      try {
        Files.createDirectories(destination.toPath().getParent());
      } catch (IOException e) {
        throw new DistributionException("Unable to create " + destination.getParentFile(), e);
      }
      logger.info("Distributing {} to {}", element.getIdentifier(), destination);

      try {
        FileSupport.link(source, destination, true);
      } catch (IOException e) {
        throw new DistributionException("Unable to copy " + source + " to " + destination, e);
      }

      if ((!supportedFormats.isEmpty()) && isStreamingFormat(element)) {
        // Only if the Smil file does not exist we need to distribute streams
        // Otherwise the streams only were extended with new qualities
        File smilFile = getSmilFile(element, mediaPackage, channelId);
        Document smilXml = getSmilDocument(smilFile);
        addElementToSmil(smilXml, channelId, mediaPackage, element);
        URI smilUri = getSmilUri(smilFile, streamingURL);

        if (smilFile.isFile()) {
          logger.debug("Skipped adding streaming manifest {} to search index, as it already exists.", element);
        } else {
          for (StreamingProtocol protocol : supportedFormats) {
            distribution.add(createTrackforStreamingProtocol(element, smilUri, protocol));
            logger.info("Distributed element {} in {} format to the Wowza Server", element, protocol);
          }
        }

        saveSmilFile(smilFile, smilXml);
      }

      logger.info("Distributed file {} to Wowza Server", element);
      return distribution;

    } catch (URISyntaxException e) {
      throw new DistributionException("Error distributing " + element, e);
    }
  }

  private void setTransport(MediaPackageElement element, TrackImpl.StreamingProtocol protocol) {
    if (element instanceof TrackImpl) {
      ((TrackImpl) element).setTransport(protocol);
    }
  }

  private File getSmilFile(MediaPackageElement element, MediaPackage mediapackage, String channelId) {
    String orgId = securityService.getOrganization().getId();
    String smilFileName = channelId + "_" + mediapackage.getIdentifier() + "_" + element.getFlavor().getType()
            + ".smil";
    return distributionDirectory.toPath().resolve(Paths.get(orgId, smilFileName)).toFile();
  }

  private URI getSmilUri(File smilFile, URI streamingURL) {
    return UriBuilder.fromUri(streamingURL).path("smil:" + smilFile.getName()).build();
  }

  private URI getStreamingUri(URI smilUri, StreamingProtocol protocol) throws URISyntaxException {
    String fileName;
    switch (protocol) {
      case HLS:
        fileName = "playlist.m3u8";
        break;
      case HDS:
        fileName = "manifest.f4m";
        break;
      case SMOOTH:
        fileName = "Manifest";
        break;
      case DASH:
        fileName = "manifest_mpm4sav_mvlist.mpd";
        break;
      default:
        fileName = "";
    }
    return new URI(UrlSupport.concat(smilUri.toString(), fileName));
  }

  private boolean isStreamingFormat(MediaPackageElement element) {
    String uriPath = element.getURI().getPath();
    return uriPath.endsWith(".mp4") || uriPath.contains("mp4:");
  }

  private Document getSmilDocument(File smilFile) throws DistributionException {
    if (!smilFile.isFile()) {
      try {
        DocumentBuilder docBuilder = XmlSafeParser.newDocumentBuilderFactory().newDocumentBuilder();
        Document doc = docBuilder.newDocument();
        Element smil = doc.createElement("smil");
        doc.appendChild(smil);

        Element head = doc.createElement("head");
        smil.appendChild(head);

        Element body = doc.createElement("body");
        smil.appendChild(body);

        Element switchElement = doc.createElement("switch");
        body.appendChild(switchElement);

        return doc;
      } catch (ParserConfigurationException ex) {
        logger.error("Could not create XML file for {}.", smilFile);
        throw new DistributionException("Could not create XML file for " + smilFile);
      }
    }

    try {
      DocumentBuilder docBuilder = XmlSafeParser.newDocumentBuilderFactory().newDocumentBuilder();
      Document doc = docBuilder.parse(smilFile);

      if (!"smil".equalsIgnoreCase(doc.getDocumentElement().getNodeName())) {
        logger.error("XML-File {} is not a SMIL file.", smilFile);
        throw new DistributionException(format("XML-File %s is not an SMIL file.", smilFile.getName()));
      }

      return doc;
    } catch (IOException e) {
      logger.error("Could not open SMIL file {}", smilFile);
      throw new DistributionException(format("Could not open SMIL file %s", smilFile));
    } catch (ParserConfigurationException e) {
      logger.error("Could not parse SMIL file {}", smilFile);
      throw new DistributionException(format("Could not parse SMIL file %s", smilFile));
    } catch (SAXException e) {
      logger.error("Could not parse XML file {}", smilFile);
      throw new DistributionException(format("Could not parse XML file %s", smilFile));
    }
  }

  private void saveSmilFile(File smilFile, Document doc) throws DistributionException {
    try {
      Transformer transformer = XmlSafeParser.newTransformerFactory().newTransformer();
      DOMSource source = new DOMSource(doc);
      StreamResult stream = new StreamResult(smilFile);
      transformer.transform(source, stream);
      logger.info("SMIL file for Wowza server saved at {}", smilFile);
    } catch (TransformerException ex) {
      logger.error("Could not write SMIL file {} for distribution", smilFile);
      throw new DistributionException(format("Could not write SMIL file %s for distribution", smilFile));
    }
  }

  private void addElementToSmil(Document doc, String channelId, MediaPackage mediapackage, MediaPackageElement element)
          throws DOMException, URISyntaxException {
    if (!(element instanceof TrackImpl)) {
      return;
    }
    TrackImpl track = (TrackImpl) element;
    NodeList switchElementsList = doc.getElementsByTagName("switch");
    Node switchElement = null;

    // There should only be one switch element in the file. If there are more we will igore this.
    // If there is no switch element we need to create the xml first.
    if (switchElementsList.getLength() > 0) {
      switchElement = switchElementsList.item(0);
    } else {
      if (doc.getElementsByTagName("head").getLength() < 1) {
        doc.appendChild(doc.createElement("head"));
      }
      if (doc.getElementsByTagName("body").getLength() < 1) {
        doc.appendChild(doc.createElement("body"));
      }
      switchElement = doc.createElement("switch");
      doc.getElementsByTagName("body").item(0).appendChild(switchElement);
    }

    Element video = doc.createElement("video");
    video.setAttribute("src", getDistributionName(channelId, mediapackage, element));

    float bitrate = 0;

    // Add bitrate corresponding to the audio streams
    for (AudioStream stream : track.getAudio()) {
      bitrate += stream.getBitRate();
    }

    // Add bitrate corresponding to the video streams
    // Also, set the video width and height values:
    // In the rare case where there is more than one video stream, the values of the first stream
    // have priority, but always prefer the first stream with both "frameWidth" and "frameHeight"
    // parameters defined
    Integer width = null;
    Integer height = null;
    for (VideoStream stream : track.getVideo()) {
      bitrate += stream.getBitRate();
      // Update if both width and height are defined for a stream or if we have no values at all
      if (((stream.getFrameWidth() != null) && (stream.getFrameHeight() != null))
              || ((width == null) && (height == null))) {
        width = stream.getFrameWidth();
        height = stream.getFrameHeight();
      }
    }

    video.setAttribute(SMIL_ATTR_VIDEO_BITRATE, Integer.toString((int) bitrate));

    if (width != null) {
      video.setAttribute(SMIL_ATTR_VIDEO_WIDTH, Integer.toString(width));
    } else {
      logger.debug("Could not set video width in the SMIL file for element {} of mediapackage {}. The value was null",
              element.getIdentifier(), mediapackage.getIdentifier());
    }
    if (height != null) {
      video.setAttribute(SMIL_ATTR_VIDEO_HEIGHT, Integer.toString(height));
    } else {
      logger.debug("Could not set video height in the SMIL file for element {} of mediapackage {}. The value was null",
              element.getIdentifier(), mediapackage.getIdentifier());
    }

    NodeList currentVideos = switchElement.getChildNodes();
    for (int i = 0; i < currentVideos.getLength(); i++) {
      Node current = currentVideos.item(i);
      if ("video".equals(current.getNodeName())) {
        float currentBitrate = Float
                .parseFloat(current.getAttributes().getNamedItem(SMIL_ATTR_VIDEO_BITRATE).getTextContent());
        if ((isSmilOrderDescending && (currentBitrate < bitrate))
                || (!isSmilOrderDescending && (currentBitrate > bitrate))) {
          switchElement.insertBefore(video, current);
          return;
        }
      }
    }

    // If we get here, we could not insert the video before
    switchElement.appendChild(video);
  }

  private TrackImpl createTrackforStreamingProtocol(MediaPackageElement element, URI smilUri,
          StreamingProtocol protocol) throws URISyntaxException {
    TrackImpl track = (TrackImpl) element.clone();

    switch (protocol) {
      case HLS:
        track.setMimeType(MimeType.mimeType("application", "x-mpegURL"));
        break;
      case HDS:
        track.setMimeType(MimeType.mimeType("application", "f4m+xml"));
        break;
      case SMOOTH:
        track.setMimeType(MimeType.mimeType("application", "vnd.ms-sstr+xml"));
        break;
      case DASH:
        track.setMimeType(MimeType.mimeType("application", "dash+xml"));
        break;
      default:
        throw new IllegalArgumentException(format("Received invalid streaming protocol: '%s'", protocol));
    }

    setTransport(track, protocol);
    track.setURI(getStreamingUri(smilUri, protocol));
    track.referTo(element);
    track.setIdentifier(null);
    track.setAudio(null);
    track.setVideo(null);
    track.setChecksum(null);

    return track;
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.distribution.api.DistributionService#retract(String,
   *      org.opencastproject.mediapackage.MediaPackage, String) java.lang.String)
   */
  @Override
  public Job retract(String channelId, MediaPackage mediapackage, String elementId) throws DistributionException {
    return retract(channelId, mediapackage, new HashSet<>(Collections.singletonList(elementId)));
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.distribution.api.StreamingDistributionService#retract(java.lang.String,
   * org.opencastproject.mediapackage.MediaPackage, java.util.Set)
   */
  @Override
  public Job retract(String channelId, MediaPackage mediaPackage, Set<String> elementIds) throws DistributionException {
    RequireUtil.notNull(mediaPackage, "mediaPackage");
    RequireUtil.notNull(elementIds, "elementIds");
    RequireUtil.notNull(channelId, "channelId");
    //
    try {
      return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
              Arrays.asList(channelId, MediaPackageParser.getAsXml(mediaPackage), gson.toJson(elementIds)),
              retractJobLoad);
    } catch (ServiceRegistryException e) {
      throw new DistributionException("Unable to create a job", e);
    }
  }

  @Override
  public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediaPackage, String elementId)
          throws DistributionException {
    Set<String> elementIds = new HashSet<String>();
    elementIds.add(elementId);
    return distributeSync(channelId, mediaPackage, elementIds);
  }

  @Override
  public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
          throws DistributionException {

    if (getStreamingURLforCurrentOrg() == null) {
      logger.warn(String.format("Trying to distribute to streaming from tenant where streaming url or port aren't set.",
              securityService.getOrganization().getId()));
      return Collections.emptyList();
    }

    if (distributionDirectory == null) {
      logger.warn("Streaming distribution directory isn't set (org.opencastproject.streaming.directory)");
      return Collections.emptyList();
    }

    URI streamingURL = getStreamingURLforCurrentOrg();
    return distributeElements(channelId, mediaPackage, elementIds, streamingURL);
  }

  @Override
  public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, String elementId)
          throws DistributionException {
    Set<String> elementIds = new HashSet<String>();
    elementIds.add(elementId);
    return retractSync(channelId, mediaPackage, elementIds);
  }

  @Override
  public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
          throws DistributionException {

    if (getStreamingURLforCurrentOrg() == null) {
      logger.warn(String.format("Trying to retract from streaming from tenant where streaming url or port aren't set.",
              securityService.getOrganization().getId()));
      return Collections.emptyList();
    }

    if (distributionDirectory == null) {
      logger.warn("Streaming distribution directory isn't set (org.opencastproject.streaming.directory)");
      return Collections.emptyList();
    }

    URI streamingURL = getStreamingURLforCurrentOrg();
    return retractElements(channelId, mediaPackage, elementIds, streamingURL);
  }

  /**
   * Retract a media package element from the distribution channel. The retracted element must not necessarily be the
   * one given as parameter <code>elementId</code>. Instead, the element's distribution URI will be calculated. This way
   * you are able to retract elements by providing the "original" element here.
   *
   * @param channelId
   *          the channel id
   * @param mediaPackage
   *          the mediaPackage
   * @param elementIds
   *          the element identifiers
   * @return the retracted element or <code>null</code> if the element was not retracted
   * @throws org.opencastproject.distribution.api.DistributionException
   *           in case of an error
   */
  private List<MediaPackageElement> retractElements(String channelId, MediaPackage mediaPackage,
          Set<String> elementIds, URI streamingURL)
          throws DistributionException {

    notNull(mediaPackage, "mediaPackage");
    notNull(elementIds, "elementIds");
    notNull(channelId, "channelId");

    List<MediaPackageElement> retractedElements = new ArrayList<>();
    for (MediaPackageElement element: getElements(mediaPackage, elementIds)) {
      retractedElements.addAll(retractElement(channelId, mediaPackage, element, streamingURL));
    }
    return retractedElements;
  }

  /**
   * Retracts the media package with the given identifier from the distribution channel.
   *
   * @param channelId
   *          the channel id
   * @param mediaPackage
   *          the media package to retract the element from
   * @param element
   *          the element to retract
   * @return the retracted element or <code>null</code> if the element was not retracted
   */
  private List<MediaPackageElement> retractElement(final String channelId, final MediaPackage mediaPackage,
          final MediaPackageElement element, URI streamingUrl) throws DistributionException {

    logger.debug("Retracting element {} with URI {}", element.getIdentifier(), element.getURI());

    // Has this element been distributed?
    if (!(element instanceof TrackImpl)) {
      return Collections.emptyList();
    }

    // Get the distribution path on the disk for this mediaPackage element
    final File elementFile = getDistributionFile(channelId, mediaPackage, element, streamingUrl);
    final File smilFile = getSmilFile(element, mediaPackage, channelId);
    logger.debug("Deleting file {}", elementFile);

    // Does the file exist? If not, the current element has not been distributed to this channel
    // or has been removed otherwise
    if (elementFile == null || !elementFile.exists()) {
      logger.warn("{} does not exist but was to be deleted", elementFile);
      return Collections.singletonList(element);
    }

    // If a SMIL file is referenced by this element, delete first all the elements within
    if (elementFile.equals(smilFile)) {
      Document smilXml = getSmilDocument(smilFile);
      NodeList videoList = smilXml.getElementsByTagName("video");
      for (int i = 0; i < videoList.getLength(); i++) {
        if (videoList.item(i) instanceof Element) {
          String smilPathStr = ((Element) videoList.item(i)).getAttribute("src");
          // Patch the streaming tags
          if (smilPathStr.contains("mp4:")) {
            smilPathStr = smilPathStr.replace("mp4:", "");
          }
          if (!smilPathStr.endsWith(".mp4")) {
            smilPathStr += ".mp4";
          }

          deleteElementFile(smilFile.toPath().resolveSibling(smilPathStr).toFile());
        }
      }

      if (smilFile.isFile() && !smilFile.delete()) {
        logger.warn("The SMIL file {} could not be successfully deleted.", smilFile);
      }
    } else {
      deleteElementFile(elementFile);
    }

    logger.info("Finished retracting element {} of media package {}", element, mediaPackage);
    return Collections.singletonList(element);
  }

  /**
   * Delete an element file and the parent folders, if necessary
   *
   * @param elementFile
   */
  private void deleteElementFile(File elementFile) {

    // Try to remove the element file
    if (elementFile.exists()) {
      if (!elementFile.delete()) {
        logger.warn("Could not properly delete element file: {}", elementFile);
      }
    } else {
      logger.warn("Tried to delete non-existent element file. Perhaps was already deleted?: {}", elementFile);
    }

    // Try to remove the parent folders, if possible
    File elementDir = elementFile.getParentFile();
    if (elementDir != null && elementDir.exists()) {
      try {
        if (FileUtils.isEmptyDirectory(elementDir)) {
          if (!elementDir.delete()) {
            logger.warn("Could not properly delete element directory: {}", elementDir);
          }
        } else {
          logger.warn("Element directory was not empty after deleting element. Skipping deletion: {}", elementDir);
        }
      } catch (IOException e) {
        logger.warn("Unable to delete element directory: {}", elementDir);
      }
    } else {
      logger.warn("Element directory did not exist when trying to delete it: {}", elementDir);
    }

    File mediapackageDir = elementDir.getParentFile();
    if (mediapackageDir != null && mediapackageDir.exists()) {
      try {
        if (FileUtils.isEmptyDirectory(mediapackageDir)) {
          if (!mediapackageDir.delete()) {
            logger.warn("Could not properly delete mediapackage directory: {}", mediapackageDir);
          }
        } else {
          logger.debug("Mediapackage directory was not empty after deleting element. Skipping deletion: {}",
              mediapackageDir);
        }
      } catch (IOException e) {
        logger.warn("Unable to delete mediapackage directory: {}", elementDir);
      }
    } else {
      logger.warn("Mediapackage directory did not exist when trying to delete it: {}", mediapackageDir);
    }
  }

  /**
   * Gets the destination file to copy the contents of a media package element.
   *
   * @return The file to copy the content to
   */
  private File getDistributionFile(String channelId, MediaPackage mediapackage, MediaPackageElement element,
          URI streamingURL) {

    final String orgId = securityService.getOrganization().getId();
    final Path distributionPath = distributionDirectory.toPath().resolve(orgId);
    final URI elementUri = element.getURI();
    URI relativeUri = streamingURL.relativize(elementUri);
    if (relativeUri != elementUri) {
      // SMIL file

      // Get the relative URL path
      String uriPath = relativeUri.getPath();
      // Remove the last part (corresponds to the part of the "virtual" manifests)
      uriPath = uriPath.substring(0, uriPath.lastIndexOf('/'));
      // Remove the "smil:" tags, if any, and set the right extension if needed
      uriPath = uriPath.replace("smil:", "");
      if (!uriPath.endsWith(".smil")) {
        uriPath += ".smil";
      }

      String[] uriPathParts = uriPath.split("/");

      if (uriPathParts.length > 1) {
        logger.warn(
            "Malformed URI path \"{}\". The SMIL files must be at the streaming application's root. Trying anyway...",
            uriPath);
      }
      return distributionPath.resolve(uriPath).toFile();
    }

    // We have an ordinary file (not yet distributed)
    return new File(getElementDirectory(channelId, mediapackage, element.getIdentifier()),
            FilenameUtils.getName(elementUri.getPath()));
  }

  /**
   * Gets the directory containing the distributed files for this mediapackage.
   *
   * @return the filesystem directory
   */
  private File getMediaPackageDirectory(String channelId, MediaPackage mediaPackage) {
    final String orgId = securityService.getOrganization().getId();
    return distributionDirectory.toPath().resolve(Paths.get(orgId, channelId, mediaPackage.getIdentifier().toString()))
            .toFile();
  }

  /**
   * Gets the directory containing the distributed file for this elementId.
   *
   * @return the filesystem directory
   */
  private File getElementDirectory(String channelId, MediaPackage mediaPackage, String elementId) {
    return new File(getMediaPackageDirectory(channelId, mediaPackage), elementId);
  }

  /**
   * Gets the URI for the element to be distributed.
   *
   * @return The resulting URI after distributionthFromSmil
   */
  private String getDistributionName(String channelId, MediaPackage mp, MediaPackageElement element) {
    String elementId = element.getIdentifier();
    String fileName = FilenameUtils.getBaseName(element.getURI().toString());
    String tag = FilenameUtils.getExtension(element.getURI().toString()) + ":";

    // removes the tag for flv files, but keeps it for all others (mp4 needs it)
    if ("flv:".equals(tag)) {
      tag = "";
    }
    return tag + channelId + "/" + mp.getIdentifier().toString() + "/" + elementId + "/" + fileName;
  }

  /**
   * {@inheritDoc}
   *
   * @see org.opencastproject.job.api.AbstractJobProducer#process(org.opencastproject.job.api.Job)
   */
  @Override
  protected String process(Job job) throws Exception {
    Operation op = null;
    String operation = job.getOperation();
    List<String> arguments = job.getArguments();
    try {
      op = Operation.valueOf(operation);
      String channelId = arguments.get(0);
      MediaPackage mediapackage = MediaPackageParser.getFromXml(arguments.get(1));
      Set<String> elementIds = gson.fromJson(arguments.get(2), new TypeToken<Set<String>>() {
      }.getType());

      URI streamingUrl = getStreamingURLforCurrentOrg();
      if (streamingUrl == null) {
        logger.warn(String.format("Trying to distribute to or retract from streaming from tenant where "
            + "streaming url or port aren't set.", securityService.getOrganization().getId()));
        return null;
      }

      if (distributionDirectory == null) {
        logger.warn("Streaming distribution directory isn't set (org.opencastproject.streaming.directory)");
        return null;
      }

      List<MediaPackageElement> elements;
      switch (op) {
        case Distribute:
          elements = distributeElements(channelId, mediapackage, elementIds, streamingUrl);
          break;
        case Retract:
          elements = retractElements(channelId, mediapackage, elementIds, streamingUrl);
          break;
        default:
          throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'");
      }
      if (!elements.isEmpty()) {
        return MediaPackageElementParser.getArrayAsXml(elements);
      }
      return null;
    } catch (IndexOutOfBoundsException e) {
      throw new ServiceRegistryException("This argument list for operation '" + op + "' does not meet expectations", e);
    } catch (Exception e) {
      throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
    }
  }

  private Set<MediaPackageElement> getElements(MediaPackage mediapackage, Set<String> elementIds)
          throws IllegalStateException {
    final Set<MediaPackageElement> elements = new HashSet<>();
    for (String elementId : elementIds) {
      final MediaPackageElement element = mediapackage.getElementById(elementId);
      if (element != null) {
        elements.add(element);
      } else {
        logger.debug("No element " + elementId + " found in media package " + mediapackage.getIdentifier());
      }
    }
    return elements;
  }

  public File getDistributionDirectory() {
    return distributionDirectory;
  }

  @Reference
  @Override
  public void setWorkspace(Workspace workspace) {
    super.setWorkspace(workspace);
  }

  @Reference
  @Override
  public void setServiceRegistry(ServiceRegistry serviceRegistry) {
    super.setServiceRegistry(serviceRegistry);
  }

  @Reference
  @Override
  public void setSecurityService(SecurityService securityService) {
    super.setSecurityService(securityService);
  }

  @Reference
  @Override
  public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
    super.setUserDirectoryService(userDirectoryService);
  }

  @Reference
  @Override
  public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
    super.setOrganizationDirectoryService(organizationDirectoryService);
  }

}