View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.distribution.streaming.wowza;
22  
23  import static java.lang.String.format;
24  import static org.opencastproject.util.RequireUtil.notNull;
25  
26  import org.opencastproject.distribution.api.AbstractDistributionService;
27  import org.opencastproject.distribution.api.DistributionException;
28  import org.opencastproject.distribution.api.DistributionService;
29  import org.opencastproject.distribution.api.StreamingDistributionService;
30  import org.opencastproject.job.api.Job;
31  import org.opencastproject.mediapackage.AudioStream;
32  import org.opencastproject.mediapackage.MediaPackage;
33  import org.opencastproject.mediapackage.MediaPackageElement;
34  import org.opencastproject.mediapackage.MediaPackageElementParser;
35  import org.opencastproject.mediapackage.MediaPackageParser;
36  import org.opencastproject.mediapackage.VideoStream;
37  import org.opencastproject.mediapackage.track.TrackImpl;
38  import org.opencastproject.mediapackage.track.TrackImpl.StreamingProtocol;
39  import org.opencastproject.security.api.Organization;
40  import org.opencastproject.security.api.OrganizationDirectoryService;
41  import org.opencastproject.security.api.SecurityService;
42  import org.opencastproject.security.api.UserDirectoryService;
43  import org.opencastproject.serviceregistry.api.ServiceRegistry;
44  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
45  import org.opencastproject.util.FileSupport;
46  import org.opencastproject.util.LoadUtil;
47  import org.opencastproject.util.MimeType;
48  import org.opencastproject.util.NotFoundException;
49  import org.opencastproject.util.RequireUtil;
50  import org.opencastproject.util.UrlSupport;
51  import org.opencastproject.util.XmlSafeParser;
52  import org.opencastproject.workspace.api.Workspace;
53  
54  import com.google.gson.Gson;
55  import com.google.gson.reflect.TypeToken;
56  
57  import org.apache.commons.io.FileUtils;
58  import org.apache.commons.io.FilenameUtils;
59  import org.apache.commons.lang3.StringUtils;
60  import org.osgi.framework.BundleContext;
61  import org.osgi.service.cm.ConfigurationException;
62  import org.osgi.service.component.ComponentException;
63  import org.osgi.service.component.annotations.Activate;
64  import org.osgi.service.component.annotations.Component;
65  import org.osgi.service.component.annotations.Modified;
66  import org.osgi.service.component.annotations.Reference;
67  import org.slf4j.Logger;
68  import org.slf4j.LoggerFactory;
69  import org.w3c.dom.DOMException;
70  import org.w3c.dom.Document;
71  import org.w3c.dom.Element;
72  import org.w3c.dom.Node;
73  import org.w3c.dom.NodeList;
74  import org.xml.sax.SAXException;
75  
76  import java.io.File;
77  import java.io.IOException;
78  import java.net.URI;
79  import java.net.URISyntaxException;
80  import java.nio.file.Files;
81  import java.nio.file.Path;
82  import java.nio.file.Paths;
83  import java.util.ArrayList;
84  import java.util.Arrays;
85  import java.util.Collections;
86  import java.util.HashMap;
87  import java.util.HashSet;
88  import java.util.List;
89  import java.util.Map;
90  import java.util.Set;
91  import java.util.TreeSet;
92  import java.util.concurrent.ConcurrentHashMap;
93  
94  import javax.ws.rs.core.UriBuilder;
95  import javax.xml.parsers.DocumentBuilder;
96  import javax.xml.parsers.ParserConfigurationException;
97  import javax.xml.transform.Transformer;
98  import javax.xml.transform.TransformerException;
99  import javax.xml.transform.dom.DOMSource;
100 import javax.xml.transform.stream.StreamResult;
101 
102 /**
103  * Distributes media to the local media delivery directory.
104  */
105 @Component(
106     immediate = true,
107     service = { DistributionService.class, StreamingDistributionService.class },
108     property = {
109         "service.description=Distribution Service (Streaming)",
110         "distribution.channel=streaming"
111     }
112 )
113 public class WowzaStreamingDistributionService extends AbstractDistributionService
114         implements StreamingDistributionService {
115 
116   /** The key in the properties file that defines the streaming directory. */
117   protected static final String STREAMING_DIRECTORY_KEY = "org.opencastproject.streaming.directory";
118 
119   /** The key in the properties file that defines the streaming formats to distribute. */
120   protected static final String WOWZA_FORMATS_KEY = "org.opencastproject.wowza.formats";
121 
122   /** The tenant-specific key in the properties file that defines the wowza url. */
123   protected static final String WOWZA_URL_KEY = "org.opencastproject.%s.wowza.url";
124 
125   /** The tenant specific key in the properties file that defines the wowza port. */
126   protected static final String WOWZA_PORT_KEY = "org.opencastproject.%s.wowza.port";
127 
128   protected Map<String, URI> streamingUrls;
129 
130   /** The key in the properties file that specifies in which order the videos in the SMIL file should be stored */
131   protected static final String SMIL_ORDER_KEY = "org.opencastproject.wowza.smil.order";
132 
133   /** One of the possible values for the order of the videos in the SMIL file */
134   private static final String SMIL_ASCENDING_VALUE = "ascending";
135 
136   /** One of the possible values for the order of the videos in the SMIL file */
137   private static final String SMIL_DESCENDING_VALUE = "descending";
138 
139   /** The attribute "video-bitrate" in the SMIL files */
140   private static final String SMIL_ATTR_VIDEO_BITRATE = "video-bitrate";
141 
142   /** The attribute "video-width" in the SMIL files */
143   private static final String SMIL_ATTR_VIDEO_WIDTH = "width";
144 
145   /** The attribute "video-height" in the SMIL files */
146   private static final String SMIL_ATTR_VIDEO_HEIGHT = "height";
147 
148   /** The attribute to return for Distribution Type */
149   private static final String DISTRIBUTION_TYPE = "streaming";
150 
151   /** Acceptable values for the streaming schemes */
152   private static final Set<String> validSchemes;
153   private static final Map<String, Integer> defaultProtocolPorts;
154 
155   static {
156     Set<String> temp = new HashSet<>();
157     temp.add("http");
158     temp.add("https");
159     validSchemes = Collections.unmodifiableSet(temp);
160 
161     Map<String, Integer> tempMap = new HashMap<>();
162     tempMap.put("http", 80);
163     tempMap.put("https", 443);
164     defaultProtocolPorts = Collections.unmodifiableMap(tempMap);
165   }
166 
167   /** Default scheme */
168   protected static final String DEFAULT_SCHEME = "http";
169 
170   /** Logging facility */
171   private static final Logger logger = LoggerFactory.getLogger(WowzaStreamingDistributionService.class);
172 
173   /** Receipt type */
174   public static final String JOB_TYPE = "org.opencastproject.distribution.streaming";
175 
176   /** List of available operations on jobs */
177   private enum Operation {
178     Distribute, Retract
179   };
180 
181   /** The load on the system introduced by creating a distribute job */
182   public static final float DEFAULT_DISTRIBUTE_JOB_LOAD = 0.1f;
183 
184   /** The load on the system introduced by creating a retract job */
185   public static final float DEFAULT_RETRACT_JOB_LOAD = 0.1f;
186 
187   /** The key to look for in the service configuration file to override the {@link #DEFAULT_DISTRIBUTE_JOB_LOAD} */
188   public static final String DISTRIBUTE_JOB_LOAD_KEY = "job.load.streaming.distribute";
189 
190   /** The key to look for in the service configuration file to override the {@link #DEFAULT_RETRACT_JOB_LOAD} */
191   public static final String RETRACT_JOB_LOAD_KEY = "job.load.streaming.retract";
192 
193   /** The load on the system introduced by creating a distribute job */
194   private float distributeJobLoad = DEFAULT_DISTRIBUTE_JOB_LOAD;
195 
196   /** The load on the system introduced by creating a retract job */
197   private float retractJobLoad = DEFAULT_RETRACT_JOB_LOAD;
198 
199   /** The distribution directory */
200   private File distributionDirectory = null;
201 
202   /** The set of supported streaming formats to distribute. */
203   private Set<StreamingProtocol> supportedFormats;
204 
205   /** Whether or not the video order in the SMIL files is descending */
206   private boolean isSmilOrderDescending = false;
207 
208   private static final Gson gson = new Gson();
209 
210   /**
211    * Creates a new instance of the streaming distribution service.
212    */
213   public WowzaStreamingDistributionService() {
214     super(JOB_TYPE);
215   }
216 
217   public String getDistributionType() {
218     return DISTRIBUTION_TYPE;
219   }
220 
221   @Activate
222   public void activate(BundleContext bundleContext, Map<String, Object> properties)
223           throws ComponentException, ConfigurationException {
224     modified(bundleContext, properties);
225   }
226 
227   @Modified
228   public void modified(BundleContext bundleContext, Map<String, Object> properties)
229           throws ComponentException, ConfigurationException {
230 
231     // get configuration
232     if (properties != null && bundleContext != null) {
233 
234       Map streamingUrlConfiguration = new ConcurrentHashMap<>();
235 
236       // Streaming directory
237       String distributionDirectoryPath = StringUtils.trimToNull((String) properties.get(STREAMING_DIRECTORY_KEY));
238 
239       if (distributionDirectoryPath == null) {
240         // set default streaming directory to ${org.opencastproject.storage.dir}/streams
241         distributionDirectoryPath
242             = StringUtils.trimToNull(bundleContext.getProperty("org.opencastproject.storage.dir"));
243         if (distributionDirectoryPath != null) {
244           distributionDirectoryPath += "/streams";
245         }
246       }
247       if (distributionDirectoryPath == null) {
248         throw new ComponentException("Streaming distribution directory must be set");
249       }
250 
251       distributionDirectory = new File(distributionDirectoryPath);
252       if (!distributionDirectory.isDirectory()) {
253         try {
254           Files.createDirectories(distributionDirectory.toPath());
255         } catch (IOException e) {
256           throw new ComponentException("Distribution directory " + distributionDirectory
257               + " does not exist and can't be created", e);
258         }
259       }
260 
261       logger.info("Streaming distribution directory is {}", distributionDirectory);
262 
263       // Streaming URLs
264       List<Organization> organizations = organizationDirectoryService.getOrganizations();
265       for (Organization org: organizations) {
266         String orgId = org.getId();
267         String streamingUrl = StringUtils.trimToNull((String) properties.get(String.format(WOWZA_URL_KEY, orgId)));
268         String streamingPort = StringUtils.trimToNull((String) properties.get(String.format(WOWZA_PORT_KEY, orgId)));
269 
270         if (streamingUrl != null) {
271           try {
272             URI tenantStreamingUrl = getStreamingUrl(streamingUrl, streamingPort, validSchemes, DEFAULT_SCHEME, null);
273 
274             if (tenantStreamingUrl == null) {
275               throw new ComponentException(String.format("Streaming URL is undefined for tenant %s.", orgId));
276             }
277 
278             streamingUrlConfiguration.put(orgId, tenantStreamingUrl);
279             logger.info("Wowza Streaming URL for tenant {} set to \"{}\"", orgId, tenantStreamingUrl);
280           } catch (URISyntaxException e) {
281             throw new ComponentException(
282                     String.format("Wowza Streaming URL %s of tenant %s could not be parsed", streamingUrl, orgId), e);
283           }
284         } else {
285           logger.debug("Wowza Streaming URL is undefined for tenant {}", orgId);
286         }
287       }
288 
289       streamingUrls = streamingUrlConfiguration;
290 
291       // Streaming formats
292       String formats = StringUtils.trimToNull((String) properties.get(WOWZA_FORMATS_KEY));
293 
294       if (formats == null) {
295         setDefaultSupportedFormats();
296       } else {
297         setSupportedFormats(formats);
298       }
299       logger.info("The supported streaming formats are: {}", StringUtils.join(supportedFormats, ","));
300 
301       // Smil order
302       String smilOrder = StringUtils.trimToNull((String) properties.get(SMIL_ORDER_KEY));
303 
304       if (smilOrder == null || SMIL_ASCENDING_VALUE.equals(smilOrder)) {
305         logger.info("The videos in the SMIL files will be sorted in ascending bitrate order");
306         isSmilOrderDescending = false;
307       } else if (SMIL_DESCENDING_VALUE.equals(smilOrder)) {
308         isSmilOrderDescending = true;
309         logger.info("The videos in the SMIL files will be sorted in descending bitrate order");
310       } else {
311         throw new ConfigurationException(SMIL_ORDER_KEY, format("Illegal value '%s'. Valid options are '%s' and '%s'",
312                 smilOrder, SMIL_ASCENDING_VALUE, SMIL_DESCENDING_VALUE));
313       }
314 
315       // Job loads
316       distributeJobLoad = LoadUtil.getConfiguredLoadValue(properties, DISTRIBUTE_JOB_LOAD_KEY,
317               DEFAULT_DISTRIBUTE_JOB_LOAD, serviceRegistry);
318       retractJobLoad = LoadUtil.getConfiguredLoadValue(properties, RETRACT_JOB_LOAD_KEY, DEFAULT_RETRACT_JOB_LOAD,
319               serviceRegistry);
320     }
321   }
322 
323   public boolean publishToStreaming() {
324     String currentOrgId = securityService.getOrganization().getId();
325     return streamingUrls.containsKey(currentOrgId);
326   }
327 
328   private URI getStreamingURLforCurrentOrg() {
329     String currentOrgId = securityService.getOrganization().getId();
330     if (streamingUrls.containsKey(currentOrgId)) {
331       return streamingUrls.get(currentOrgId);
332     }
333     return null;
334   }
335 
336   /**
337    * Transform the configuration value into the supported formats to distribute to the Wowza server.
338    *
339    * @param formatString
340    *          The string to parse with the supported formats.
341    */
342   private void setSupportedFormats(String formatString) {
343     supportedFormats = new TreeSet<>();
344 
345     for (String format : formatString.toUpperCase().split("[\\s,]")) {
346       if (!format.isEmpty()) {
347         try {
348           StreamingProtocol protocol = StreamingProtocol.valueOf(format);
349           supportedFormats.add(protocol);
350         } catch (IllegalArgumentException e) {
351           logger.warn("Found incorrect format \"{}\". Ignoring...", format);
352         }
353       }
354     }
355   }
356 
357   /**
358    * Get the default set of supported formats to distribute to Wowza.
359    */
360   private void setDefaultSupportedFormats() {
361     supportedFormats = new TreeSet<>(Arrays.asList(
362             TrackImpl.StreamingProtocol.HLS,
363             TrackImpl.StreamingProtocol.HDS,
364             TrackImpl.StreamingProtocol.SMOOTH,
365             TrackImpl.StreamingProtocol.DASH));
366   }
367 
368   /**
369    * Calculate a streaming URL based on input parameters
370    *
371    * @throws URISyntaxException
372    */
373   private static URI getStreamingUrl(String inputUri, String inputPort, Set<String> validSchemes, String defaultScheme,
374           String defaultUri) throws URISyntaxException {
375 
376     Integer port;
377     try {
378       port = Integer.parseInt(StringUtils.trimToEmpty(inputPort));
379     } catch (NumberFormatException e) {
380       port = null;
381     }
382 
383     URI uri;
384     if (StringUtils.isNotBlank(inputUri)) {
385       uri = new URI(inputUri);
386     } else if (StringUtils.isNotBlank(defaultUri)) {
387       uri = new URI(defaultUri);
388     } else {
389       throw new IllegalArgumentException("Provided streaming URL is empty.");
390     }
391     UriBuilder uriBuilder = UriBuilder.fromUri(uri);
392     String scheme = uri.getScheme();
393     String uriPath = uri.getPath();
394     // When a URI does not have a scheme, Java parses it as if all the URI was a (relative) path
395     // However, we will assume that a host was always provided, so everything before the first "/" is the host,
396     // not part of the path
397     if (uri.getHost() == null) {
398       uriBuilder.host(uriPath.substring(0, uriPath.indexOf("/"))).replacePath(uriPath.substring(uriPath.indexOf("/")));
399     }
400 
401     if (!validSchemes.contains(scheme)) {
402       if (scheme == null) {
403         uriBuilder.scheme(defaultScheme);
404       } else {
405         throw new URISyntaxException(inputUri, "Provided URI has an illegal scheme");
406       }
407     }
408 
409     if ((port != null) && (!port.equals(defaultProtocolPorts.get(uriBuilder.build().getScheme())))) {
410       uriBuilder.port(port);
411     }
412 
413     return uriBuilder.build();
414   }
415 
416   /**
417    * {@inheritDoc}
418    *
419    * @see org.opencastproject.distribution.api.StreamingDistributionService#distribute(java.lang.String,
420    * org.opencastproject.mediapackage.MediaPackage, java.util.Set)
421    */
422   @Override
423   public Job distribute(String channelId, MediaPackage mediapackage, Set<String> elementIds)
424           throws DistributionException {
425 
426     notNull(mediapackage, "mediaPackage");
427     notNull(elementIds, "elementIds");
428     notNull(channelId, "channelId");
429 
430     if (getStreamingURLforCurrentOrg() == null) {
431       throw new IllegalStateException(
432               String.format("No streaming url or port set for tenant %s", securityService.getOrganization().getId()));
433     }
434     if (distributionDirectory == null) {
435       throw new IllegalStateException(
436               "Streaming distribution directory must be set (org.opencastproject.streaming.directory)");
437     }
438 
439     try {
440       return serviceRegistry.createJob(
441               JOB_TYPE,
442               Operation.Distribute.toString(),
443               Arrays.asList(channelId, MediaPackageParser.getAsXml(mediapackage), gson.toJson(elementIds)),
444               distributeJobLoad);
445     } catch (ServiceRegistryException e) {
446       throw new DistributionException("Unable to create a job", e);
447     }
448   }
449 
450   /**
451    * {@inheritDoc}
452    *
453    * @see org.opencastproject.distribution.api.DistributionService#distribute(String,
454    *      org.opencastproject.mediapackage.MediaPackage, String)
455    */
456   @Override
457   public Job distribute(final String channelId, final MediaPackage mediapackage, final String elementId)
458           throws DistributionException {
459     return distribute(channelId, mediapackage, new HashSet<>(Collections.singletonList(elementId)));
460   }
461 
462   /**
463    * Distribute media package elements to the download distribution service.
464    *
465    * @param channelId The id of the publication channel to be distributed to.
466    * @param mediaPackage The media package that contains the elements to be distributed.
467    * @param elementIds The ids of the elements that should be distributed
468    * contained within the media package.
469    * @return A reference to the MediaPackageElements that have been distributed.
470    * @throws DistributionException Thrown if the parent directory of the
471    * MediaPackageElement cannot be created, if the MediaPackageElement cannot be
472    * copied or another unexpected exception occurs.
473    */
474   private List<MediaPackageElement> distributeElements(final String channelId, final MediaPackage mediaPackage,
475           final Set<String> elementIds, URI streamingURL) throws DistributionException {
476     notNull(mediaPackage, "mediaPackage");
477     notNull(elementIds, "elementIds");
478     notNull(channelId, "channelId");
479 
480     List<MediaPackageElement> distributedElements = new ArrayList<>();
481     for (MediaPackageElement element : getElements(mediaPackage, elementIds)) {
482       distributedElements.addAll(distributeElement(channelId, mediaPackage, element, streamingURL));
483     }
484     return distributedElements;
485   }
486 
487   /**
488    * Distribute a media package element to the download distribution service.
489    *
490    * @param mediaPackage
491    *          The media package that contains the element to distribute.
492    * @param element
493    *          The element to be distributed
494    * @return A list of elements that have been distributed
495    * @throws DistributionException
496    *           Thrown if the parent directory of the MediaPackageElement cannot be created, if the MediaPackageElement
497    *           cannot be copied or another unexpected exception occurs.
498    */
499   private synchronized List<MediaPackageElement> distributeElement(final String channelId,
500           final MediaPackage mediaPackage, final MediaPackageElement element, URI streamingURL)
501           throws DistributionException {
502 
503     if (supportedFormats.isEmpty()) {
504       logger.warn("Skipping distribution of element \"{}\" because no streaming format was specified", element);
505       return Collections.emptyList();
506     }
507 
508     // Streaming servers only deal with tracks
509     if (!MediaPackageElement.Type.Track.equals(element.getElementType())) {
510       logger.debug("Skipping {} {} for distribution to the streaming server",
511               element.getElementType(), element.getIdentifier());
512       return Collections.emptyList();
513     }
514 
515     try {
516       File source;
517       try {
518         source = workspace.get(element.getURI());
519       } catch (NotFoundException | IOException e) {
520         throw new DistributionException("Error getting element " + element.getURI() + " from the workspace", e);
521       }
522 
523       ArrayList<MediaPackageElement> distribution = new ArrayList<>();
524 
525       // Put the file in place
526 
527       File destination = getDistributionFile(channelId, mediaPackage, element, streamingURL);
528       try {
529         Files.createDirectories(destination.toPath().getParent());
530       } catch (IOException e) {
531         throw new DistributionException("Unable to create " + destination.getParentFile(), e);
532       }
533       logger.info("Distributing {} to {}", element.getIdentifier(), destination);
534 
535       try {
536         FileSupport.link(source, destination, true);
537       } catch (IOException e) {
538         throw new DistributionException("Unable to copy " + source + " to " + destination, e);
539       }
540 
541       if ((!supportedFormats.isEmpty()) && isStreamingFormat(element)) {
542         // Only if the Smil file does not exist we need to distribute streams
543         // Otherwise the streams only were extended with new qualities
544         File smilFile = getSmilFile(element, mediaPackage, channelId);
545         Document smilXml = getSmilDocument(smilFile);
546         addElementToSmil(smilXml, channelId, mediaPackage, element);
547         URI smilUri = getSmilUri(smilFile, streamingURL);
548 
549         if (smilFile.isFile()) {
550           logger.debug("Skipped adding streaming manifest {} to search index, as it already exists.", element);
551         } else {
552           for (StreamingProtocol protocol : supportedFormats) {
553             distribution.add(createTrackforStreamingProtocol(element, smilUri, protocol));
554             logger.info("Distributed element {} in {} format to the Wowza Server", element, protocol);
555           }
556         }
557 
558         saveSmilFile(smilFile, smilXml);
559       }
560 
561       logger.info("Distributed file {} to Wowza Server", element);
562       return distribution;
563 
564     } catch (URISyntaxException e) {
565       throw new DistributionException("Error distributing " + element, e);
566     }
567   }
568 
569   private void setTransport(MediaPackageElement element, TrackImpl.StreamingProtocol protocol) {
570     if (element instanceof TrackImpl) {
571       ((TrackImpl) element).setTransport(protocol);
572     }
573   }
574 
575   private File getSmilFile(MediaPackageElement element, MediaPackage mediapackage, String channelId) {
576     String orgId = securityService.getOrganization().getId();
577     String smilFileName = channelId + "_" + mediapackage.getIdentifier() + "_" + element.getFlavor().getType()
578             + ".smil";
579     return distributionDirectory.toPath().resolve(Paths.get(orgId, smilFileName)).toFile();
580   }
581 
582   private URI getSmilUri(File smilFile, URI streamingURL) {
583     return UriBuilder.fromUri(streamingURL).path("smil:" + smilFile.getName()).build();
584   }
585 
586   private URI getStreamingUri(URI smilUri, StreamingProtocol protocol) throws URISyntaxException {
587     String fileName;
588     switch (protocol) {
589       case HLS:
590         fileName = "playlist.m3u8";
591         break;
592       case HDS:
593         fileName = "manifest.f4m";
594         break;
595       case SMOOTH:
596         fileName = "Manifest";
597         break;
598       case DASH:
599         fileName = "manifest_mpm4sav_mvlist.mpd";
600         break;
601       default:
602         fileName = "";
603     }
604     return new URI(UrlSupport.concat(smilUri.toString(), fileName));
605   }
606 
607   private boolean isStreamingFormat(MediaPackageElement element) {
608     String uriPath = element.getURI().getPath();
609     return uriPath.endsWith(".mp4") || uriPath.contains("mp4:");
610   }
611 
612   private Document getSmilDocument(File smilFile) throws DistributionException {
613     if (!smilFile.isFile()) {
614       try {
615         DocumentBuilder docBuilder = XmlSafeParser.newDocumentBuilderFactory().newDocumentBuilder();
616         Document doc = docBuilder.newDocument();
617         Element smil = doc.createElement("smil");
618         doc.appendChild(smil);
619 
620         Element head = doc.createElement("head");
621         smil.appendChild(head);
622 
623         Element body = doc.createElement("body");
624         smil.appendChild(body);
625 
626         Element switchElement = doc.createElement("switch");
627         body.appendChild(switchElement);
628 
629         return doc;
630       } catch (ParserConfigurationException ex) {
631         logger.error("Could not create XML file for {}.", smilFile);
632         throw new DistributionException("Could not create XML file for " + smilFile);
633       }
634     }
635 
636     try {
637       DocumentBuilder docBuilder = XmlSafeParser.newDocumentBuilderFactory().newDocumentBuilder();
638       Document doc = docBuilder.parse(smilFile);
639 
640       if (!"smil".equalsIgnoreCase(doc.getDocumentElement().getNodeName())) {
641         logger.error("XML-File {} is not a SMIL file.", smilFile);
642         throw new DistributionException(format("XML-File %s is not an SMIL file.", smilFile.getName()));
643       }
644 
645       return doc;
646     } catch (IOException e) {
647       logger.error("Could not open SMIL file {}", smilFile);
648       throw new DistributionException(format("Could not open SMIL file %s", smilFile));
649     } catch (ParserConfigurationException e) {
650       logger.error("Could not parse SMIL file {}", smilFile);
651       throw new DistributionException(format("Could not parse SMIL file %s", smilFile));
652     } catch (SAXException e) {
653       logger.error("Could not parse XML file {}", smilFile);
654       throw new DistributionException(format("Could not parse XML file %s", smilFile));
655     }
656   }
657 
658   private void saveSmilFile(File smilFile, Document doc) throws DistributionException {
659     try {
660       Transformer transformer = XmlSafeParser.newTransformerFactory().newTransformer();
661       DOMSource source = new DOMSource(doc);
662       StreamResult stream = new StreamResult(smilFile);
663       transformer.transform(source, stream);
664       logger.info("SMIL file for Wowza server saved at {}", smilFile);
665     } catch (TransformerException ex) {
666       logger.error("Could not write SMIL file {} for distribution", smilFile);
667       throw new DistributionException(format("Could not write SMIL file %s for distribution", smilFile));
668     }
669   }
670 
671   private void addElementToSmil(Document doc, String channelId, MediaPackage mediapackage, MediaPackageElement element)
672           throws DOMException, URISyntaxException {
673     if (!(element instanceof TrackImpl)) {
674       return;
675     }
676     TrackImpl track = (TrackImpl) element;
677     NodeList switchElementsList = doc.getElementsByTagName("switch");
678     Node switchElement = null;
679 
680     // There should only be one switch element in the file. If there are more we will igore this.
681     // If there is no switch element we need to create the xml first.
682     if (switchElementsList.getLength() > 0) {
683       switchElement = switchElementsList.item(0);
684     } else {
685       if (doc.getElementsByTagName("head").getLength() < 1) {
686         doc.appendChild(doc.createElement("head"));
687       }
688       if (doc.getElementsByTagName("body").getLength() < 1) {
689         doc.appendChild(doc.createElement("body"));
690       }
691       switchElement = doc.createElement("switch");
692       doc.getElementsByTagName("body").item(0).appendChild(switchElement);
693     }
694 
695     Element video = doc.createElement("video");
696     video.setAttribute("src", getDistributionName(channelId, mediapackage, element));
697 
698     float bitrate = 0;
699 
700     // Add bitrate corresponding to the audio streams
701     for (AudioStream stream : track.getAudio()) {
702       bitrate += stream.getBitRate();
703     }
704 
705     // Add bitrate corresponding to the video streams
706     // Also, set the video width and height values:
707     // In the rare case where there is more than one video stream, the values of the first stream
708     // have priority, but always prefer the first stream with both "frameWidth" and "frameHeight"
709     // parameters defined
710     Integer width = null;
711     Integer height = null;
712     for (VideoStream stream : track.getVideo()) {
713       bitrate += stream.getBitRate();
714       // Update if both width and height are defined for a stream or if we have no values at all
715       if (((stream.getFrameWidth() != null) && (stream.getFrameHeight() != null))
716               || ((width == null) && (height == null))) {
717         width = stream.getFrameWidth();
718         height = stream.getFrameHeight();
719       }
720     }
721 
722     video.setAttribute(SMIL_ATTR_VIDEO_BITRATE, Integer.toString((int) bitrate));
723 
724     if (width != null) {
725       video.setAttribute(SMIL_ATTR_VIDEO_WIDTH, Integer.toString(width));
726     } else {
727       logger.debug("Could not set video width in the SMIL file for element {} of mediapackage {}. The value was null",
728               element.getIdentifier(), mediapackage.getIdentifier());
729     }
730     if (height != null) {
731       video.setAttribute(SMIL_ATTR_VIDEO_HEIGHT, Integer.toString(height));
732     } else {
733       logger.debug("Could not set video height in the SMIL file for element {} of mediapackage {}. The value was null",
734               element.getIdentifier(), mediapackage.getIdentifier());
735     }
736 
737     NodeList currentVideos = switchElement.getChildNodes();
738     for (int i = 0; i < currentVideos.getLength(); i++) {
739       Node current = currentVideos.item(i);
740       if ("video".equals(current.getNodeName())) {
741         float currentBitrate = Float
742                 .parseFloat(current.getAttributes().getNamedItem(SMIL_ATTR_VIDEO_BITRATE).getTextContent());
743         if ((isSmilOrderDescending && (currentBitrate < bitrate))
744                 || (!isSmilOrderDescending && (currentBitrate > bitrate))) {
745           switchElement.insertBefore(video, current);
746           return;
747         }
748       }
749     }
750 
751     // If we get here, we could not insert the video before
752     switchElement.appendChild(video);
753   }
754 
755   private TrackImpl createTrackforStreamingProtocol(MediaPackageElement element, URI smilUri,
756           StreamingProtocol protocol) throws URISyntaxException {
757     TrackImpl track = (TrackImpl) element.clone();
758 
759     switch (protocol) {
760       case HLS:
761         track.setMimeType(MimeType.mimeType("application", "x-mpegURL"));
762         break;
763       case HDS:
764         track.setMimeType(MimeType.mimeType("application", "f4m+xml"));
765         break;
766       case SMOOTH:
767         track.setMimeType(MimeType.mimeType("application", "vnd.ms-sstr+xml"));
768         break;
769       case DASH:
770         track.setMimeType(MimeType.mimeType("application", "dash+xml"));
771         break;
772       default:
773         throw new IllegalArgumentException(format("Received invalid streaming protocol: '%s'", protocol));
774     }
775 
776     setTransport(track, protocol);
777     track.setURI(getStreamingUri(smilUri, protocol));
778     track.referTo(element);
779     track.setIdentifier(null);
780     track.setAudio(null);
781     track.setVideo(null);
782     track.setChecksum(null);
783 
784     return track;
785   }
786 
787   /**
788    * {@inheritDoc}
789    *
790    * @see org.opencastproject.distribution.api.DistributionService#retract(String,
791    *      org.opencastproject.mediapackage.MediaPackage, String) java.lang.String)
792    */
793   @Override
794   public Job retract(String channelId, MediaPackage mediapackage, String elementId) throws DistributionException {
795     return retract(channelId, mediapackage, new HashSet<>(Collections.singletonList(elementId)));
796   }
797 
798   /**
799    * {@inheritDoc}
800    *
801    * @see org.opencastproject.distribution.api.StreamingDistributionService#retract(java.lang.String,
802    * org.opencastproject.mediapackage.MediaPackage, java.util.Set)
803    */
804   @Override
805   public Job retract(String channelId, MediaPackage mediaPackage, Set<String> elementIds) throws DistributionException {
806     RequireUtil.notNull(mediaPackage, "mediaPackage");
807     RequireUtil.notNull(elementIds, "elementIds");
808     RequireUtil.notNull(channelId, "channelId");
809     //
810     try {
811       return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
812               Arrays.asList(channelId, MediaPackageParser.getAsXml(mediaPackage), gson.toJson(elementIds)),
813               retractJobLoad);
814     } catch (ServiceRegistryException e) {
815       throw new DistributionException("Unable to create a job", e);
816     }
817   }
818 
819   @Override
820   public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediaPackage, String elementId)
821           throws DistributionException {
822     Set<String> elementIds = new HashSet<String>();
823     elementIds.add(elementId);
824     return distributeSync(channelId, mediaPackage, elementIds);
825   }
826 
827   @Override
828   public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
829           throws DistributionException {
830 
831     if (getStreamingURLforCurrentOrg() == null) {
832       logger.warn(String.format("Trying to distribute to streaming from tenant where streaming url or port aren't set.",
833               securityService.getOrganization().getId()));
834       return Collections.emptyList();
835     }
836 
837     if (distributionDirectory == null) {
838       logger.warn("Streaming distribution directory isn't set (org.opencastproject.streaming.directory)");
839       return Collections.emptyList();
840     }
841 
842     URI streamingURL = getStreamingURLforCurrentOrg();
843     return distributeElements(channelId, mediaPackage, elementIds, streamingURL);
844   }
845 
846   @Override
847   public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, String elementId)
848           throws DistributionException {
849     Set<String> elementIds = new HashSet<String>();
850     elementIds.add(elementId);
851     return retractSync(channelId, mediaPackage, elementIds);
852   }
853 
854   @Override
855   public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
856           throws DistributionException {
857 
858     if (getStreamingURLforCurrentOrg() == null) {
859       logger.warn(String.format("Trying to retract from streaming from tenant where streaming url or port aren't set.",
860               securityService.getOrganization().getId()));
861       return Collections.emptyList();
862     }
863 
864     if (distributionDirectory == null) {
865       logger.warn("Streaming distribution directory isn't set (org.opencastproject.streaming.directory)");
866       return Collections.emptyList();
867     }
868 
869     URI streamingURL = getStreamingURLforCurrentOrg();
870     return retractElements(channelId, mediaPackage, elementIds, streamingURL);
871   }
872 
873   /**
874    * Retract a media package element from the distribution channel. The retracted element must not necessarily be the
875    * one given as parameter <code>elementId</code>. Instead, the element's distribution URI will be calculated. This way
876    * you are able to retract elements by providing the "original" element here.
877    *
878    * @param channelId
879    *          the channel id
880    * @param mediaPackage
881    *          the mediaPackage
882    * @param elementIds
883    *          the element identifiers
884    * @return the retracted element or <code>null</code> if the element was not retracted
885    * @throws org.opencastproject.distribution.api.DistributionException
886    *           in case of an error
887    */
888   private List<MediaPackageElement> retractElements(String channelId, MediaPackage mediaPackage,
889           Set<String> elementIds, URI streamingURL)
890           throws DistributionException {
891 
892     notNull(mediaPackage, "mediaPackage");
893     notNull(elementIds, "elementIds");
894     notNull(channelId, "channelId");
895 
896     List<MediaPackageElement> retractedElements = new ArrayList<>();
897     for (MediaPackageElement element: getElements(mediaPackage, elementIds)) {
898       retractedElements.addAll(retractElement(channelId, mediaPackage, element, streamingURL));
899     }
900     return retractedElements;
901   }
902 
903   /**
904    * Retracts the media package with the given identifier from the distribution channel.
905    *
906    * @param channelId
907    *          the channel id
908    * @param mediaPackage
909    *          the media package to retract the element from
910    * @param element
911    *          the element to retract
912    * @return the retracted element or <code>null</code> if the element was not retracted
913    */
914   private List<MediaPackageElement> retractElement(final String channelId, final MediaPackage mediaPackage,
915           final MediaPackageElement element, URI streamingUrl) throws DistributionException {
916 
917     logger.debug("Retracting element {} with URI {}", element.getIdentifier(), element.getURI());
918 
919     // Has this element been distributed?
920     if (!(element instanceof TrackImpl)) {
921       return Collections.emptyList();
922     }
923 
924     // Get the distribution path on the disk for this mediaPackage element
925     final File elementFile = getDistributionFile(channelId, mediaPackage, element, streamingUrl);
926     final File smilFile = getSmilFile(element, mediaPackage, channelId);
927     logger.debug("Deleting file {}", elementFile);
928 
929     // Does the file exist? If not, the current element has not been distributed to this channel
930     // or has been removed otherwise
931     if (elementFile == null || !elementFile.exists()) {
932       logger.warn("{} does not exist but was to be deleted", elementFile);
933       return Collections.singletonList(element);
934     }
935 
936     // If a SMIL file is referenced by this element, delete first all the elements within
937     if (elementFile.equals(smilFile)) {
938       Document smilXml = getSmilDocument(smilFile);
939       NodeList videoList = smilXml.getElementsByTagName("video");
940       for (int i = 0; i < videoList.getLength(); i++) {
941         if (videoList.item(i) instanceof Element) {
942           String smilPathStr = ((Element) videoList.item(i)).getAttribute("src");
943           // Patch the streaming tags
944           if (smilPathStr.contains("mp4:")) {
945             smilPathStr = smilPathStr.replace("mp4:", "");
946           }
947           if (!smilPathStr.endsWith(".mp4")) {
948             smilPathStr += ".mp4";
949           }
950 
951           deleteElementFile(smilFile.toPath().resolveSibling(smilPathStr).toFile());
952         }
953       }
954 
955       if (smilFile.isFile() && !smilFile.delete()) {
956         logger.warn("The SMIL file {} could not be successfully deleted.", smilFile);
957       }
958     } else {
959       deleteElementFile(elementFile);
960     }
961 
962     logger.info("Finished retracting element {} of media package {}", element, mediaPackage);
963     return Collections.singletonList(element);
964   }
965 
966   /**
967    * Delete an element file and the parent folders, if necessary
968    *
969    * @param elementFile
970    */
971   private void deleteElementFile(File elementFile) {
972 
973     // Try to remove the element file
974     if (elementFile.exists()) {
975       if (!elementFile.delete()) {
976         logger.warn("Could not properly delete element file: {}", elementFile);
977       }
978     } else {
979       logger.warn("Tried to delete non-existent element file. Perhaps was already deleted?: {}", elementFile);
980     }
981 
982     // Try to remove the parent folders, if possible
983     File elementDir = elementFile.getParentFile();
984     if (elementDir != null && elementDir.exists()) {
985       try {
986         if (FileUtils.isEmptyDirectory(elementDir)) {
987           if (!elementDir.delete()) {
988             logger.warn("Could not properly delete element directory: {}", elementDir);
989           }
990         } else {
991           logger.warn("Element directory was not empty after deleting element. Skipping deletion: {}", elementDir);
992         }
993       } catch (IOException e) {
994         logger.warn("Unable to delete element directory: {}", elementDir);
995       }
996     } else {
997       logger.warn("Element directory did not exist when trying to delete it: {}", elementDir);
998     }
999 
1000     File mediapackageDir = elementDir.getParentFile();
1001     if (mediapackageDir != null && mediapackageDir.exists()) {
1002       try {
1003         if (FileUtils.isEmptyDirectory(mediapackageDir)) {
1004           if (!mediapackageDir.delete()) {
1005             logger.warn("Could not properly delete mediapackage directory: {}", mediapackageDir);
1006           }
1007         } else {
1008           logger.debug("Mediapackage directory was not empty after deleting element. Skipping deletion: {}",
1009               mediapackageDir);
1010         }
1011       } catch (IOException e) {
1012         logger.warn("Unable to delete mediapackage directory: {}", elementDir);
1013       }
1014     } else {
1015       logger.warn("Mediapackage directory did not exist when trying to delete it: {}", mediapackageDir);
1016     }
1017   }
1018 
1019   /**
1020    * Gets the destination file to copy the contents of a media package element.
1021    *
1022    * @return The file to copy the content to
1023    */
1024   private File getDistributionFile(String channelId, MediaPackage mediapackage, MediaPackageElement element,
1025           URI streamingURL) {
1026 
1027     final String orgId = securityService.getOrganization().getId();
1028     final Path distributionPath = distributionDirectory.toPath().resolve(orgId);
1029     final URI elementUri = element.getURI();
1030     URI relativeUri = streamingURL.relativize(elementUri);
1031     if (relativeUri != elementUri) {
1032       // SMIL file
1033 
1034       // Get the relative URL path
1035       String uriPath = relativeUri.getPath();
1036       // Remove the last part (corresponds to the part of the "virtual" manifests)
1037       uriPath = uriPath.substring(0, uriPath.lastIndexOf('/'));
1038       // Remove the "smil:" tags, if any, and set the right extension if needed
1039       uriPath = uriPath.replace("smil:", "");
1040       if (!uriPath.endsWith(".smil")) {
1041         uriPath += ".smil";
1042       }
1043 
1044       String[] uriPathParts = uriPath.split("/");
1045 
1046       if (uriPathParts.length > 1) {
1047         logger.warn(
1048             "Malformed URI path \"{}\". The SMIL files must be at the streaming application's root. Trying anyway...",
1049             uriPath);
1050       }
1051       return distributionPath.resolve(uriPath).toFile();
1052     }
1053 
1054     // We have an ordinary file (not yet distributed)
1055     return new File(getElementDirectory(channelId, mediapackage, element.getIdentifier()),
1056             FilenameUtils.getName(elementUri.getPath()));
1057   }
1058 
1059   /**
1060    * Gets the directory containing the distributed files for this mediapackage.
1061    *
1062    * @return the filesystem directory
1063    */
1064   private File getMediaPackageDirectory(String channelId, MediaPackage mediaPackage) {
1065     final String orgId = securityService.getOrganization().getId();
1066     return distributionDirectory.toPath().resolve(Paths.get(orgId, channelId, mediaPackage.getIdentifier().toString()))
1067             .toFile();
1068   }
1069 
1070   /**
1071    * Gets the directory containing the distributed file for this elementId.
1072    *
1073    * @return the filesystem directory
1074    */
1075   private File getElementDirectory(String channelId, MediaPackage mediaPackage, String elementId) {
1076     return new File(getMediaPackageDirectory(channelId, mediaPackage), elementId);
1077   }
1078 
1079   /**
1080    * Gets the URI for the element to be distributed.
1081    *
1082    * @return The resulting URI after distributionthFromSmil
1083    */
1084   private String getDistributionName(String channelId, MediaPackage mp, MediaPackageElement element) {
1085     String elementId = element.getIdentifier();
1086     String fileName = FilenameUtils.getBaseName(element.getURI().toString());
1087     String tag = FilenameUtils.getExtension(element.getURI().toString()) + ":";
1088 
1089     // removes the tag for flv files, but keeps it for all others (mp4 needs it)
1090     if ("flv:".equals(tag)) {
1091       tag = "";
1092     }
1093     return tag + channelId + "/" + mp.getIdentifier().toString() + "/" + elementId + "/" + fileName;
1094   }
1095 
1096   /**
1097    * {@inheritDoc}
1098    *
1099    * @see org.opencastproject.job.api.AbstractJobProducer#process(org.opencastproject.job.api.Job)
1100    */
1101   @Override
1102   protected String process(Job job) throws Exception {
1103     Operation op = null;
1104     String operation = job.getOperation();
1105     List<String> arguments = job.getArguments();
1106     try {
1107       op = Operation.valueOf(operation);
1108       String channelId = arguments.get(0);
1109       MediaPackage mediapackage = MediaPackageParser.getFromXml(arguments.get(1));
1110       Set<String> elementIds = gson.fromJson(arguments.get(2), new TypeToken<Set<String>>() {
1111       }.getType());
1112 
1113       URI streamingUrl = getStreamingURLforCurrentOrg();
1114       if (streamingUrl == null) {
1115         logger.warn(String.format("Trying to distribute to or retract from streaming from tenant where "
1116             + "streaming url or port aren't set.", securityService.getOrganization().getId()));
1117         return null;
1118       }
1119 
1120       if (distributionDirectory == null) {
1121         logger.warn("Streaming distribution directory isn't set (org.opencastproject.streaming.directory)");
1122         return null;
1123       }
1124 
1125       List<MediaPackageElement> elements;
1126       switch (op) {
1127         case Distribute:
1128           elements = distributeElements(channelId, mediapackage, elementIds, streamingUrl);
1129           break;
1130         case Retract:
1131           elements = retractElements(channelId, mediapackage, elementIds, streamingUrl);
1132           break;
1133         default:
1134           throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'");
1135       }
1136       if (!elements.isEmpty()) {
1137         return MediaPackageElementParser.getArrayAsXml(elements);
1138       }
1139       return null;
1140     } catch (IndexOutOfBoundsException e) {
1141       throw new ServiceRegistryException("This argument list for operation '" + op + "' does not meet expectations", e);
1142     } catch (Exception e) {
1143       throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
1144     }
1145   }
1146 
1147   private Set<MediaPackageElement> getElements(MediaPackage mediapackage, Set<String> elementIds)
1148           throws IllegalStateException {
1149     final Set<MediaPackageElement> elements = new HashSet<>();
1150     for (String elementId : elementIds) {
1151       final MediaPackageElement element = mediapackage.getElementById(elementId);
1152       if (element != null) {
1153         elements.add(element);
1154       } else {
1155         logger.debug("No element " + elementId + " found in media package " + mediapackage.getIdentifier());
1156       }
1157     }
1158     return elements;
1159   }
1160 
1161   public File getDistributionDirectory() {
1162     return distributionDirectory;
1163   }
1164 
1165   @Reference
1166   @Override
1167   public void setWorkspace(Workspace workspace) {
1168     super.setWorkspace(workspace);
1169   }
1170 
1171   @Reference
1172   @Override
1173   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
1174     super.setServiceRegistry(serviceRegistry);
1175   }
1176 
1177   @Reference
1178   @Override
1179   public void setSecurityService(SecurityService securityService) {
1180     super.setSecurityService(securityService);
1181   }
1182 
1183   @Reference
1184   @Override
1185   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1186     super.setUserDirectoryService(userDirectoryService);
1187   }
1188 
1189   @Reference
1190   @Override
1191   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
1192     super.setOrganizationDirectoryService(organizationDirectoryService);
1193   }
1194 
1195 }