1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.distribution.streaming.wowza;
22
23 import static java.lang.String.format;
24 import static org.opencastproject.util.RequireUtil.notNull;
25
26 import org.opencastproject.distribution.api.AbstractDistributionService;
27 import org.opencastproject.distribution.api.DistributionException;
28 import org.opencastproject.distribution.api.DistributionService;
29 import org.opencastproject.distribution.api.StreamingDistributionService;
30 import org.opencastproject.job.api.Job;
31 import org.opencastproject.mediapackage.AudioStream;
32 import org.opencastproject.mediapackage.MediaPackage;
33 import org.opencastproject.mediapackage.MediaPackageElement;
34 import org.opencastproject.mediapackage.MediaPackageElementParser;
35 import org.opencastproject.mediapackage.MediaPackageParser;
36 import org.opencastproject.mediapackage.VideoStream;
37 import org.opencastproject.mediapackage.track.TrackImpl;
38 import org.opencastproject.mediapackage.track.TrackImpl.StreamingProtocol;
39 import org.opencastproject.security.api.Organization;
40 import org.opencastproject.security.api.OrganizationDirectoryService;
41 import org.opencastproject.security.api.SecurityService;
42 import org.opencastproject.security.api.UserDirectoryService;
43 import org.opencastproject.serviceregistry.api.ServiceRegistry;
44 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
45 import org.opencastproject.util.FileSupport;
46 import org.opencastproject.util.LoadUtil;
47 import org.opencastproject.util.MimeType;
48 import org.opencastproject.util.NotFoundException;
49 import org.opencastproject.util.RequireUtil;
50 import org.opencastproject.util.UrlSupport;
51 import org.opencastproject.util.XmlSafeParser;
52 import org.opencastproject.workspace.api.Workspace;
53
54 import com.google.gson.Gson;
55 import com.google.gson.reflect.TypeToken;
56
57 import org.apache.commons.io.FileUtils;
58 import org.apache.commons.io.FilenameUtils;
59 import org.apache.commons.lang3.StringUtils;
60 import org.osgi.framework.BundleContext;
61 import org.osgi.service.cm.ConfigurationException;
62 import org.osgi.service.component.ComponentException;
63 import org.osgi.service.component.annotations.Activate;
64 import org.osgi.service.component.annotations.Component;
65 import org.osgi.service.component.annotations.Modified;
66 import org.osgi.service.component.annotations.Reference;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import org.w3c.dom.DOMException;
70 import org.w3c.dom.Document;
71 import org.w3c.dom.Element;
72 import org.w3c.dom.Node;
73 import org.w3c.dom.NodeList;
74 import org.xml.sax.SAXException;
75
76 import java.io.File;
77 import java.io.IOException;
78 import java.net.URI;
79 import java.net.URISyntaxException;
80 import java.nio.file.Files;
81 import java.nio.file.Path;
82 import java.nio.file.Paths;
83 import java.util.ArrayList;
84 import java.util.Arrays;
85 import java.util.Collections;
86 import java.util.HashMap;
87 import java.util.HashSet;
88 import java.util.List;
89 import java.util.Map;
90 import java.util.Set;
91 import java.util.TreeSet;
92 import java.util.concurrent.ConcurrentHashMap;
93
94 import javax.ws.rs.core.UriBuilder;
95 import javax.xml.parsers.DocumentBuilder;
96 import javax.xml.parsers.ParserConfigurationException;
97 import javax.xml.transform.Transformer;
98 import javax.xml.transform.TransformerException;
99 import javax.xml.transform.dom.DOMSource;
100 import javax.xml.transform.stream.StreamResult;
101
102
103
104
105 @Component(
106 immediate = true,
107 service = { DistributionService.class, StreamingDistributionService.class },
108 property = {
109 "service.description=Distribution Service (Streaming)",
110 "distribution.channel=streaming"
111 }
112 )
113 public class WowzaStreamingDistributionService extends AbstractDistributionService
114 implements StreamingDistributionService {
115
116
117 protected static final String STREAMING_DIRECTORY_KEY = "org.opencastproject.streaming.directory";
118
119
120 protected static final String WOWZA_FORMATS_KEY = "org.opencastproject.wowza.formats";
121
122
123 protected static final String WOWZA_URL_KEY = "org.opencastproject.%s.wowza.url";
124
125
126 protected static final String WOWZA_PORT_KEY = "org.opencastproject.%s.wowza.port";
127
128 protected Map<String, URI> streamingUrls;
129
130
131 protected static final String SMIL_ORDER_KEY = "org.opencastproject.wowza.smil.order";
132
133
134 private static final String SMIL_ASCENDING_VALUE = "ascending";
135
136
137 private static final String SMIL_DESCENDING_VALUE = "descending";
138
139
140 private static final String SMIL_ATTR_VIDEO_BITRATE = "video-bitrate";
141
142
143 private static final String SMIL_ATTR_VIDEO_WIDTH = "width";
144
145
146 private static final String SMIL_ATTR_VIDEO_HEIGHT = "height";
147
148
149 private static final String DISTRIBUTION_TYPE = "streaming";
150
151
152 private static final Set<String> validSchemes;
153 private static final Map<String, Integer> defaultProtocolPorts;
154
155 static {
156 Set<String> temp = new HashSet<>();
157 temp.add("http");
158 temp.add("https");
159 validSchemes = Collections.unmodifiableSet(temp);
160
161 Map<String, Integer> tempMap = new HashMap<>();
162 tempMap.put("http", 80);
163 tempMap.put("https", 443);
164 defaultProtocolPorts = Collections.unmodifiableMap(tempMap);
165 }
166
167
168 protected static final String DEFAULT_SCHEME = "http";
169
170
171 private static final Logger logger = LoggerFactory.getLogger(WowzaStreamingDistributionService.class);
172
173
174 public static final String JOB_TYPE = "org.opencastproject.distribution.streaming";
175
176
177 private enum Operation {
178 Distribute, Retract
179 };
180
181
182 public static final float DEFAULT_DISTRIBUTE_JOB_LOAD = 0.1f;
183
184
185 public static final float DEFAULT_RETRACT_JOB_LOAD = 0.1f;
186
187
188 public static final String DISTRIBUTE_JOB_LOAD_KEY = "job.load.streaming.distribute";
189
190
191 public static final String RETRACT_JOB_LOAD_KEY = "job.load.streaming.retract";
192
193
194 private float distributeJobLoad = DEFAULT_DISTRIBUTE_JOB_LOAD;
195
196
197 private float retractJobLoad = DEFAULT_RETRACT_JOB_LOAD;
198
199
200 private File distributionDirectory = null;
201
202
203 private Set<StreamingProtocol> supportedFormats;
204
205
206 private boolean isSmilOrderDescending = false;
207
208 private static final Gson gson = new Gson();
209
210
211
212
213 public WowzaStreamingDistributionService() {
214 super(JOB_TYPE);
215 }
216
217 public String getDistributionType() {
218 return DISTRIBUTION_TYPE;
219 }
220
221 @Activate
222 public void activate(BundleContext bundleContext, Map<String, Object> properties)
223 throws ComponentException, ConfigurationException {
224 modified(bundleContext, properties);
225 }
226
227 @Modified
228 public void modified(BundleContext bundleContext, Map<String, Object> properties)
229 throws ComponentException, ConfigurationException {
230
231
232 if (properties != null && bundleContext != null) {
233
234 Map streamingUrlConfiguration = new ConcurrentHashMap<>();
235
236
237 String distributionDirectoryPath = StringUtils.trimToNull((String) properties.get(STREAMING_DIRECTORY_KEY));
238
239 if (distributionDirectoryPath == null) {
240
241 distributionDirectoryPath
242 = StringUtils.trimToNull(bundleContext.getProperty("org.opencastproject.storage.dir"));
243 if (distributionDirectoryPath != null) {
244 distributionDirectoryPath += "/streams";
245 }
246 }
247 if (distributionDirectoryPath == null) {
248 throw new ComponentException("Streaming distribution directory must be set");
249 }
250
251 distributionDirectory = new File(distributionDirectoryPath);
252 if (!distributionDirectory.isDirectory()) {
253 try {
254 Files.createDirectories(distributionDirectory.toPath());
255 } catch (IOException e) {
256 throw new ComponentException("Distribution directory " + distributionDirectory
257 + " does not exist and can't be created", e);
258 }
259 }
260
261 logger.info("Streaming distribution directory is {}", distributionDirectory);
262
263
264 List<Organization> organizations = organizationDirectoryService.getOrganizations();
265 for (Organization org: organizations) {
266 String orgId = org.getId();
267 String streamingUrl = StringUtils.trimToNull((String) properties.get(String.format(WOWZA_URL_KEY, orgId)));
268 String streamingPort = StringUtils.trimToNull((String) properties.get(String.format(WOWZA_PORT_KEY, orgId)));
269
270 if (streamingUrl != null) {
271 try {
272 URI tenantStreamingUrl = getStreamingUrl(streamingUrl, streamingPort, validSchemes, DEFAULT_SCHEME, null);
273
274 if (tenantStreamingUrl == null) {
275 throw new ComponentException(String.format("Streaming URL is undefined for tenant %s.", orgId));
276 }
277
278 streamingUrlConfiguration.put(orgId, tenantStreamingUrl);
279 logger.info("Wowza Streaming URL for tenant {} set to \"{}\"", orgId, tenantStreamingUrl);
280 } catch (URISyntaxException e) {
281 throw new ComponentException(
282 String.format("Wowza Streaming URL %s of tenant %s could not be parsed", streamingUrl, orgId), e);
283 }
284 } else {
285 logger.debug("Wowza Streaming URL is undefined for tenant {}", orgId);
286 }
287 }
288
289 streamingUrls = streamingUrlConfiguration;
290
291
292 String formats = StringUtils.trimToNull((String) properties.get(WOWZA_FORMATS_KEY));
293
294 if (formats == null) {
295 setDefaultSupportedFormats();
296 } else {
297 setSupportedFormats(formats);
298 }
299 logger.info("The supported streaming formats are: {}", StringUtils.join(supportedFormats, ","));
300
301
302 String smilOrder = StringUtils.trimToNull((String) properties.get(SMIL_ORDER_KEY));
303
304 if (smilOrder == null || SMIL_ASCENDING_VALUE.equals(smilOrder)) {
305 logger.info("The videos in the SMIL files will be sorted in ascending bitrate order");
306 isSmilOrderDescending = false;
307 } else if (SMIL_DESCENDING_VALUE.equals(smilOrder)) {
308 isSmilOrderDescending = true;
309 logger.info("The videos in the SMIL files will be sorted in descending bitrate order");
310 } else {
311 throw new ConfigurationException(SMIL_ORDER_KEY, format("Illegal value '%s'. Valid options are '%s' and '%s'",
312 smilOrder, SMIL_ASCENDING_VALUE, SMIL_DESCENDING_VALUE));
313 }
314
315
316 distributeJobLoad = LoadUtil.getConfiguredLoadValue(properties, DISTRIBUTE_JOB_LOAD_KEY,
317 DEFAULT_DISTRIBUTE_JOB_LOAD, serviceRegistry);
318 retractJobLoad = LoadUtil.getConfiguredLoadValue(properties, RETRACT_JOB_LOAD_KEY, DEFAULT_RETRACT_JOB_LOAD,
319 serviceRegistry);
320 }
321 }
322
323 public boolean publishToStreaming() {
324 String currentOrgId = securityService.getOrganization().getId();
325 return streamingUrls.containsKey(currentOrgId);
326 }
327
328 private URI getStreamingURLforCurrentOrg() {
329 String currentOrgId = securityService.getOrganization().getId();
330 if (streamingUrls.containsKey(currentOrgId)) {
331 return streamingUrls.get(currentOrgId);
332 }
333 return null;
334 }
335
336
337
338
339
340
341
342 private void setSupportedFormats(String formatString) {
343 supportedFormats = new TreeSet<>();
344
345 for (String format : formatString.toUpperCase().split("[\\s,]")) {
346 if (!format.isEmpty()) {
347 try {
348 StreamingProtocol protocol = StreamingProtocol.valueOf(format);
349 supportedFormats.add(protocol);
350 } catch (IllegalArgumentException e) {
351 logger.warn("Found incorrect format \"{}\". Ignoring...", format);
352 }
353 }
354 }
355 }
356
357
358
359
360 private void setDefaultSupportedFormats() {
361 supportedFormats = new TreeSet<>(Arrays.asList(
362 TrackImpl.StreamingProtocol.HLS,
363 TrackImpl.StreamingProtocol.HDS,
364 TrackImpl.StreamingProtocol.SMOOTH,
365 TrackImpl.StreamingProtocol.DASH));
366 }
367
368
369
370
371
372
373 private static URI getStreamingUrl(String inputUri, String inputPort, Set<String> validSchemes, String defaultScheme,
374 String defaultUri) throws URISyntaxException {
375
376 Integer port;
377 try {
378 port = Integer.parseInt(StringUtils.trimToEmpty(inputPort));
379 } catch (NumberFormatException e) {
380 port = null;
381 }
382
383 URI uri;
384 if (StringUtils.isNotBlank(inputUri)) {
385 uri = new URI(inputUri);
386 } else if (StringUtils.isNotBlank(defaultUri)) {
387 uri = new URI(defaultUri);
388 } else {
389 throw new IllegalArgumentException("Provided streaming URL is empty.");
390 }
391 UriBuilder uriBuilder = UriBuilder.fromUri(uri);
392 String scheme = uri.getScheme();
393 String uriPath = uri.getPath();
394
395
396
397 if (uri.getHost() == null) {
398 uriBuilder.host(uriPath.substring(0, uriPath.indexOf("/"))).replacePath(uriPath.substring(uriPath.indexOf("/")));
399 }
400
401 if (!validSchemes.contains(scheme)) {
402 if (scheme == null) {
403 uriBuilder.scheme(defaultScheme);
404 } else {
405 throw new URISyntaxException(inputUri, "Provided URI has an illegal scheme");
406 }
407 }
408
409 if ((port != null) && (!port.equals(defaultProtocolPorts.get(uriBuilder.build().getScheme())))) {
410 uriBuilder.port(port);
411 }
412
413 return uriBuilder.build();
414 }
415
416
417
418
419
420
421
422 @Override
423 public Job distribute(String channelId, MediaPackage mediapackage, Set<String> elementIds)
424 throws DistributionException {
425
426 notNull(mediapackage, "mediaPackage");
427 notNull(elementIds, "elementIds");
428 notNull(channelId, "channelId");
429
430 if (getStreamingURLforCurrentOrg() == null) {
431 throw new IllegalStateException(
432 String.format("No streaming url or port set for tenant %s", securityService.getOrganization().getId()));
433 }
434 if (distributionDirectory == null) {
435 throw new IllegalStateException(
436 "Streaming distribution directory must be set (org.opencastproject.streaming.directory)");
437 }
438
439 try {
440 return serviceRegistry.createJob(
441 JOB_TYPE,
442 Operation.Distribute.toString(),
443 Arrays.asList(channelId, MediaPackageParser.getAsXml(mediapackage), gson.toJson(elementIds)),
444 distributeJobLoad);
445 } catch (ServiceRegistryException e) {
446 throw new DistributionException("Unable to create a job", e);
447 }
448 }
449
450
451
452
453
454
455
456 @Override
457 public Job distribute(final String channelId, final MediaPackage mediapackage, final String elementId)
458 throws DistributionException {
459 return distribute(channelId, mediapackage, new HashSet<>(Collections.singletonList(elementId)));
460 }
461
462
463
464
465
466
467
468
469
470
471
472
473
474 private List<MediaPackageElement> distributeElements(final String channelId, final MediaPackage mediaPackage,
475 final Set<String> elementIds, URI streamingURL) throws DistributionException {
476 notNull(mediaPackage, "mediaPackage");
477 notNull(elementIds, "elementIds");
478 notNull(channelId, "channelId");
479
480 List<MediaPackageElement> distributedElements = new ArrayList<>();
481 for (MediaPackageElement element : getElements(mediaPackage, elementIds)) {
482 distributedElements.addAll(distributeElement(channelId, mediaPackage, element, streamingURL));
483 }
484 return distributedElements;
485 }
486
487
488
489
490
491
492
493
494
495
496
497
498
499 private synchronized List<MediaPackageElement> distributeElement(final String channelId,
500 final MediaPackage mediaPackage, final MediaPackageElement element, URI streamingURL)
501 throws DistributionException {
502
503 if (supportedFormats.isEmpty()) {
504 logger.warn("Skipping distribution of element \"{}\" because no streaming format was specified", element);
505 return Collections.emptyList();
506 }
507
508
509 if (!MediaPackageElement.Type.Track.equals(element.getElementType())) {
510 logger.debug("Skipping {} {} for distribution to the streaming server",
511 element.getElementType(), element.getIdentifier());
512 return Collections.emptyList();
513 }
514
515 try {
516 File source;
517 try {
518 source = workspace.get(element.getURI());
519 } catch (NotFoundException | IOException e) {
520 throw new DistributionException("Error getting element " + element.getURI() + " from the workspace", e);
521 }
522
523 ArrayList<MediaPackageElement> distribution = new ArrayList<>();
524
525
526
527 File destination = getDistributionFile(channelId, mediaPackage, element, streamingURL);
528 try {
529 Files.createDirectories(destination.toPath().getParent());
530 } catch (IOException e) {
531 throw new DistributionException("Unable to create " + destination.getParentFile(), e);
532 }
533 logger.info("Distributing {} to {}", element.getIdentifier(), destination);
534
535 try {
536 FileSupport.link(source, destination, true);
537 } catch (IOException e) {
538 throw new DistributionException("Unable to copy " + source + " to " + destination, e);
539 }
540
541 if ((!supportedFormats.isEmpty()) && isStreamingFormat(element)) {
542
543
544 File smilFile = getSmilFile(element, mediaPackage, channelId);
545 Document smilXml = getSmilDocument(smilFile);
546 addElementToSmil(smilXml, channelId, mediaPackage, element);
547 URI smilUri = getSmilUri(smilFile, streamingURL);
548
549 if (smilFile.isFile()) {
550 logger.debug("Skipped adding streaming manifest {} to search index, as it already exists.", element);
551 } else {
552 for (StreamingProtocol protocol : supportedFormats) {
553 distribution.add(createTrackforStreamingProtocol(element, smilUri, protocol));
554 logger.info("Distributed element {} in {} format to the Wowza Server", element, protocol);
555 }
556 }
557
558 saveSmilFile(smilFile, smilXml);
559 }
560
561 logger.info("Distributed file {} to Wowza Server", element);
562 return distribution;
563
564 } catch (URISyntaxException e) {
565 throw new DistributionException("Error distributing " + element, e);
566 }
567 }
568
569 private void setTransport(MediaPackageElement element, TrackImpl.StreamingProtocol protocol) {
570 if (element instanceof TrackImpl) {
571 ((TrackImpl) element).setTransport(protocol);
572 }
573 }
574
575 private File getSmilFile(MediaPackageElement element, MediaPackage mediapackage, String channelId) {
576 String orgId = securityService.getOrganization().getId();
577 String smilFileName = channelId + "_" + mediapackage.getIdentifier() + "_" + element.getFlavor().getType()
578 + ".smil";
579 return distributionDirectory.toPath().resolve(Paths.get(orgId, smilFileName)).toFile();
580 }
581
582 private URI getSmilUri(File smilFile, URI streamingURL) {
583 return UriBuilder.fromUri(streamingURL).path("smil:" + smilFile.getName()).build();
584 }
585
586 private URI getStreamingUri(URI smilUri, StreamingProtocol protocol) throws URISyntaxException {
587 String fileName;
588 switch (protocol) {
589 case HLS:
590 fileName = "playlist.m3u8";
591 break;
592 case HDS:
593 fileName = "manifest.f4m";
594 break;
595 case SMOOTH:
596 fileName = "Manifest";
597 break;
598 case DASH:
599 fileName = "manifest_mpm4sav_mvlist.mpd";
600 break;
601 default:
602 fileName = "";
603 }
604 return new URI(UrlSupport.concat(smilUri.toString(), fileName));
605 }
606
607 private boolean isStreamingFormat(MediaPackageElement element) {
608 String uriPath = element.getURI().getPath();
609 return uriPath.endsWith(".mp4") || uriPath.contains("mp4:");
610 }
611
612 private Document getSmilDocument(File smilFile) throws DistributionException {
613 if (!smilFile.isFile()) {
614 try {
615 DocumentBuilder docBuilder = XmlSafeParser.newDocumentBuilderFactory().newDocumentBuilder();
616 Document doc = docBuilder.newDocument();
617 Element smil = doc.createElement("smil");
618 doc.appendChild(smil);
619
620 Element head = doc.createElement("head");
621 smil.appendChild(head);
622
623 Element body = doc.createElement("body");
624 smil.appendChild(body);
625
626 Element switchElement = doc.createElement("switch");
627 body.appendChild(switchElement);
628
629 return doc;
630 } catch (ParserConfigurationException ex) {
631 logger.error("Could not create XML file for {}.", smilFile);
632 throw new DistributionException("Could not create XML file for " + smilFile);
633 }
634 }
635
636 try {
637 DocumentBuilder docBuilder = XmlSafeParser.newDocumentBuilderFactory().newDocumentBuilder();
638 Document doc = docBuilder.parse(smilFile);
639
640 if (!"smil".equalsIgnoreCase(doc.getDocumentElement().getNodeName())) {
641 logger.error("XML-File {} is not a SMIL file.", smilFile);
642 throw new DistributionException(format("XML-File %s is not an SMIL file.", smilFile.getName()));
643 }
644
645 return doc;
646 } catch (IOException e) {
647 logger.error("Could not open SMIL file {}", smilFile);
648 throw new DistributionException(format("Could not open SMIL file %s", smilFile));
649 } catch (ParserConfigurationException e) {
650 logger.error("Could not parse SMIL file {}", smilFile);
651 throw new DistributionException(format("Could not parse SMIL file %s", smilFile));
652 } catch (SAXException e) {
653 logger.error("Could not parse XML file {}", smilFile);
654 throw new DistributionException(format("Could not parse XML file %s", smilFile));
655 }
656 }
657
658 private void saveSmilFile(File smilFile, Document doc) throws DistributionException {
659 try {
660 Transformer transformer = XmlSafeParser.newTransformerFactory().newTransformer();
661 DOMSource source = new DOMSource(doc);
662 StreamResult stream = new StreamResult(smilFile);
663 transformer.transform(source, stream);
664 logger.info("SMIL file for Wowza server saved at {}", smilFile);
665 } catch (TransformerException ex) {
666 logger.error("Could not write SMIL file {} for distribution", smilFile);
667 throw new DistributionException(format("Could not write SMIL file %s for distribution", smilFile));
668 }
669 }
670
671 private void addElementToSmil(Document doc, String channelId, MediaPackage mediapackage, MediaPackageElement element)
672 throws DOMException, URISyntaxException {
673 if (!(element instanceof TrackImpl)) {
674 return;
675 }
676 TrackImpl track = (TrackImpl) element;
677 NodeList switchElementsList = doc.getElementsByTagName("switch");
678 Node switchElement = null;
679
680
681
682 if (switchElementsList.getLength() > 0) {
683 switchElement = switchElementsList.item(0);
684 } else {
685 if (doc.getElementsByTagName("head").getLength() < 1) {
686 doc.appendChild(doc.createElement("head"));
687 }
688 if (doc.getElementsByTagName("body").getLength() < 1) {
689 doc.appendChild(doc.createElement("body"));
690 }
691 switchElement = doc.createElement("switch");
692 doc.getElementsByTagName("body").item(0).appendChild(switchElement);
693 }
694
695 Element video = doc.createElement("video");
696 video.setAttribute("src", getDistributionName(channelId, mediapackage, element));
697
698 float bitrate = 0;
699
700
701 for (AudioStream stream : track.getAudio()) {
702 bitrate += stream.getBitRate();
703 }
704
705
706
707
708
709
710 Integer width = null;
711 Integer height = null;
712 for (VideoStream stream : track.getVideo()) {
713 bitrate += stream.getBitRate();
714
715 if (((stream.getFrameWidth() != null) && (stream.getFrameHeight() != null))
716 || ((width == null) && (height == null))) {
717 width = stream.getFrameWidth();
718 height = stream.getFrameHeight();
719 }
720 }
721
722 video.setAttribute(SMIL_ATTR_VIDEO_BITRATE, Integer.toString((int) bitrate));
723
724 if (width != null) {
725 video.setAttribute(SMIL_ATTR_VIDEO_WIDTH, Integer.toString(width));
726 } else {
727 logger.debug("Could not set video width in the SMIL file for element {} of mediapackage {}. The value was null",
728 element.getIdentifier(), mediapackage.getIdentifier());
729 }
730 if (height != null) {
731 video.setAttribute(SMIL_ATTR_VIDEO_HEIGHT, Integer.toString(height));
732 } else {
733 logger.debug("Could not set video height in the SMIL file for element {} of mediapackage {}. The value was null",
734 element.getIdentifier(), mediapackage.getIdentifier());
735 }
736
737 NodeList currentVideos = switchElement.getChildNodes();
738 for (int i = 0; i < currentVideos.getLength(); i++) {
739 Node current = currentVideos.item(i);
740 if ("video".equals(current.getNodeName())) {
741 float currentBitrate = Float
742 .parseFloat(current.getAttributes().getNamedItem(SMIL_ATTR_VIDEO_BITRATE).getTextContent());
743 if ((isSmilOrderDescending && (currentBitrate < bitrate))
744 || (!isSmilOrderDescending && (currentBitrate > bitrate))) {
745 switchElement.insertBefore(video, current);
746 return;
747 }
748 }
749 }
750
751
752 switchElement.appendChild(video);
753 }
754
755 private TrackImpl createTrackforStreamingProtocol(MediaPackageElement element, URI smilUri,
756 StreamingProtocol protocol) throws URISyntaxException {
757 TrackImpl track = (TrackImpl) element.clone();
758
759 switch (protocol) {
760 case HLS:
761 track.setMimeType(MimeType.mimeType("application", "x-mpegURL"));
762 break;
763 case HDS:
764 track.setMimeType(MimeType.mimeType("application", "f4m+xml"));
765 break;
766 case SMOOTH:
767 track.setMimeType(MimeType.mimeType("application", "vnd.ms-sstr+xml"));
768 break;
769 case DASH:
770 track.setMimeType(MimeType.mimeType("application", "dash+xml"));
771 break;
772 default:
773 throw new IllegalArgumentException(format("Received invalid streaming protocol: '%s'", protocol));
774 }
775
776 setTransport(track, protocol);
777 track.setURI(getStreamingUri(smilUri, protocol));
778 track.referTo(element);
779 track.setIdentifier(null);
780 track.setAudio(null);
781 track.setVideo(null);
782 track.setChecksum(null);
783
784 return track;
785 }
786
787
788
789
790
791
792
793 @Override
794 public Job retract(String channelId, MediaPackage mediapackage, String elementId) throws DistributionException {
795 return retract(channelId, mediapackage, new HashSet<>(Collections.singletonList(elementId)));
796 }
797
798
799
800
801
802
803
804 @Override
805 public Job retract(String channelId, MediaPackage mediaPackage, Set<String> elementIds) throws DistributionException {
806 RequireUtil.notNull(mediaPackage, "mediaPackage");
807 RequireUtil.notNull(elementIds, "elementIds");
808 RequireUtil.notNull(channelId, "channelId");
809
810 try {
811 return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
812 Arrays.asList(channelId, MediaPackageParser.getAsXml(mediaPackage), gson.toJson(elementIds)),
813 retractJobLoad);
814 } catch (ServiceRegistryException e) {
815 throw new DistributionException("Unable to create a job", e);
816 }
817 }
818
819 @Override
820 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediaPackage, String elementId)
821 throws DistributionException {
822 Set<String> elementIds = new HashSet<String>();
823 elementIds.add(elementId);
824 return distributeSync(channelId, mediaPackage, elementIds);
825 }
826
827 @Override
828 public List<MediaPackageElement> distributeSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
829 throws DistributionException {
830
831 if (getStreamingURLforCurrentOrg() == null) {
832 logger.warn(String.format("Trying to distribute to streaming from tenant where streaming url or port aren't set.",
833 securityService.getOrganization().getId()));
834 return Collections.emptyList();
835 }
836
837 if (distributionDirectory == null) {
838 logger.warn("Streaming distribution directory isn't set (org.opencastproject.streaming.directory)");
839 return Collections.emptyList();
840 }
841
842 URI streamingURL = getStreamingURLforCurrentOrg();
843 return distributeElements(channelId, mediaPackage, elementIds, streamingURL);
844 }
845
846 @Override
847 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, String elementId)
848 throws DistributionException {
849 Set<String> elementIds = new HashSet<String>();
850 elementIds.add(elementId);
851 return retractSync(channelId, mediaPackage, elementIds);
852 }
853
854 @Override
855 public List<MediaPackageElement> retractSync(String channelId, MediaPackage mediaPackage, Set<String> elementIds)
856 throws DistributionException {
857
858 if (getStreamingURLforCurrentOrg() == null) {
859 logger.warn(String.format("Trying to retract from streaming from tenant where streaming url or port aren't set.",
860 securityService.getOrganization().getId()));
861 return Collections.emptyList();
862 }
863
864 if (distributionDirectory == null) {
865 logger.warn("Streaming distribution directory isn't set (org.opencastproject.streaming.directory)");
866 return Collections.emptyList();
867 }
868
869 URI streamingURL = getStreamingURLforCurrentOrg();
870 return retractElements(channelId, mediaPackage, elementIds, streamingURL);
871 }
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888 private List<MediaPackageElement> retractElements(String channelId, MediaPackage mediaPackage,
889 Set<String> elementIds, URI streamingURL)
890 throws DistributionException {
891
892 notNull(mediaPackage, "mediaPackage");
893 notNull(elementIds, "elementIds");
894 notNull(channelId, "channelId");
895
896 List<MediaPackageElement> retractedElements = new ArrayList<>();
897 for (MediaPackageElement element: getElements(mediaPackage, elementIds)) {
898 retractedElements.addAll(retractElement(channelId, mediaPackage, element, streamingURL));
899 }
900 return retractedElements;
901 }
902
903
904
905
906
907
908
909
910
911
912
913
914 private List<MediaPackageElement> retractElement(final String channelId, final MediaPackage mediaPackage,
915 final MediaPackageElement element, URI streamingUrl) throws DistributionException {
916
917 logger.debug("Retracting element {} with URI {}", element.getIdentifier(), element.getURI());
918
919
920 if (!(element instanceof TrackImpl)) {
921 return Collections.emptyList();
922 }
923
924
925 final File elementFile = getDistributionFile(channelId, mediaPackage, element, streamingUrl);
926 final File smilFile = getSmilFile(element, mediaPackage, channelId);
927 logger.debug("Deleting file {}", elementFile);
928
929
930
931 if (elementFile == null || !elementFile.exists()) {
932 logger.warn("{} does not exist but was to be deleted", elementFile);
933 return Collections.singletonList(element);
934 }
935
936
937 if (elementFile.equals(smilFile)) {
938 Document smilXml = getSmilDocument(smilFile);
939 NodeList videoList = smilXml.getElementsByTagName("video");
940 for (int i = 0; i < videoList.getLength(); i++) {
941 if (videoList.item(i) instanceof Element) {
942 String smilPathStr = ((Element) videoList.item(i)).getAttribute("src");
943
944 if (smilPathStr.contains("mp4:")) {
945 smilPathStr = smilPathStr.replace("mp4:", "");
946 }
947 if (!smilPathStr.endsWith(".mp4")) {
948 smilPathStr += ".mp4";
949 }
950
951 deleteElementFile(smilFile.toPath().resolveSibling(smilPathStr).toFile());
952 }
953 }
954
955 if (smilFile.isFile() && !smilFile.delete()) {
956 logger.warn("The SMIL file {} could not be successfully deleted.", smilFile);
957 }
958 } else {
959 deleteElementFile(elementFile);
960 }
961
962 logger.info("Finished retracting element {} of media package {}", element, mediaPackage);
963 return Collections.singletonList(element);
964 }
965
966
967
968
969
970
971 private void deleteElementFile(File elementFile) {
972
973
974 if (elementFile.exists()) {
975 if (!elementFile.delete()) {
976 logger.warn("Could not properly delete element file: {}", elementFile);
977 }
978 } else {
979 logger.warn("Tried to delete non-existent element file. Perhaps was already deleted?: {}", elementFile);
980 }
981
982
983 File elementDir = elementFile.getParentFile();
984 if (elementDir != null && elementDir.exists()) {
985 try {
986 if (FileUtils.isEmptyDirectory(elementDir)) {
987 if (!elementDir.delete()) {
988 logger.warn("Could not properly delete element directory: {}", elementDir);
989 }
990 } else {
991 logger.warn("Element directory was not empty after deleting element. Skipping deletion: {}", elementDir);
992 }
993 } catch (IOException e) {
994 logger.warn("Unable to delete element directory: {}", elementDir);
995 }
996 } else {
997 logger.warn("Element directory did not exist when trying to delete it: {}", elementDir);
998 }
999
1000 File mediapackageDir = elementDir.getParentFile();
1001 if (mediapackageDir != null && mediapackageDir.exists()) {
1002 try {
1003 if (FileUtils.isEmptyDirectory(mediapackageDir)) {
1004 if (!mediapackageDir.delete()) {
1005 logger.warn("Could not properly delete mediapackage directory: {}", mediapackageDir);
1006 }
1007 } else {
1008 logger.debug("Mediapackage directory was not empty after deleting element. Skipping deletion: {}",
1009 mediapackageDir);
1010 }
1011 } catch (IOException e) {
1012 logger.warn("Unable to delete mediapackage directory: {}", elementDir);
1013 }
1014 } else {
1015 logger.warn("Mediapackage directory did not exist when trying to delete it: {}", mediapackageDir);
1016 }
1017 }
1018
1019
1020
1021
1022
1023
1024 private File getDistributionFile(String channelId, MediaPackage mediapackage, MediaPackageElement element,
1025 URI streamingURL) {
1026
1027 final String orgId = securityService.getOrganization().getId();
1028 final Path distributionPath = distributionDirectory.toPath().resolve(orgId);
1029 final URI elementUri = element.getURI();
1030 URI relativeUri = streamingURL.relativize(elementUri);
1031 if (relativeUri != elementUri) {
1032
1033
1034
1035 String uriPath = relativeUri.getPath();
1036
1037 uriPath = uriPath.substring(0, uriPath.lastIndexOf('/'));
1038
1039 uriPath = uriPath.replace("smil:", "");
1040 if (!uriPath.endsWith(".smil")) {
1041 uriPath += ".smil";
1042 }
1043
1044 String[] uriPathParts = uriPath.split("/");
1045
1046 if (uriPathParts.length > 1) {
1047 logger.warn(
1048 "Malformed URI path \"{}\". The SMIL files must be at the streaming application's root. Trying anyway...",
1049 uriPath);
1050 }
1051 return distributionPath.resolve(uriPath).toFile();
1052 }
1053
1054
1055 return new File(getElementDirectory(channelId, mediapackage, element.getIdentifier()),
1056 FilenameUtils.getName(elementUri.getPath()));
1057 }
1058
1059
1060
1061
1062
1063
1064 private File getMediaPackageDirectory(String channelId, MediaPackage mediaPackage) {
1065 final String orgId = securityService.getOrganization().getId();
1066 return distributionDirectory.toPath().resolve(Paths.get(orgId, channelId, mediaPackage.getIdentifier().toString()))
1067 .toFile();
1068 }
1069
1070
1071
1072
1073
1074
1075 private File getElementDirectory(String channelId, MediaPackage mediaPackage, String elementId) {
1076 return new File(getMediaPackageDirectory(channelId, mediaPackage), elementId);
1077 }
1078
1079
1080
1081
1082
1083
1084 private String getDistributionName(String channelId, MediaPackage mp, MediaPackageElement element) {
1085 String elementId = element.getIdentifier();
1086 String fileName = FilenameUtils.getBaseName(element.getURI().toString());
1087 String tag = FilenameUtils.getExtension(element.getURI().toString()) + ":";
1088
1089
1090 if ("flv:".equals(tag)) {
1091 tag = "";
1092 }
1093 return tag + channelId + "/" + mp.getIdentifier().toString() + "/" + elementId + "/" + fileName;
1094 }
1095
1096
1097
1098
1099
1100
1101 @Override
1102 protected String process(Job job) throws Exception {
1103 Operation op = null;
1104 String operation = job.getOperation();
1105 List<String> arguments = job.getArguments();
1106 try {
1107 op = Operation.valueOf(operation);
1108 String channelId = arguments.get(0);
1109 MediaPackage mediapackage = MediaPackageParser.getFromXml(arguments.get(1));
1110 Set<String> elementIds = gson.fromJson(arguments.get(2), new TypeToken<Set<String>>() {
1111 }.getType());
1112
1113 URI streamingUrl = getStreamingURLforCurrentOrg();
1114 if (streamingUrl == null) {
1115 logger.warn(String.format("Trying to distribute to or retract from streaming from tenant where "
1116 + "streaming url or port aren't set.", securityService.getOrganization().getId()));
1117 return null;
1118 }
1119
1120 if (distributionDirectory == null) {
1121 logger.warn("Streaming distribution directory isn't set (org.opencastproject.streaming.directory)");
1122 return null;
1123 }
1124
1125 List<MediaPackageElement> elements;
1126 switch (op) {
1127 case Distribute:
1128 elements = distributeElements(channelId, mediapackage, elementIds, streamingUrl);
1129 break;
1130 case Retract:
1131 elements = retractElements(channelId, mediapackage, elementIds, streamingUrl);
1132 break;
1133 default:
1134 throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'");
1135 }
1136 if (!elements.isEmpty()) {
1137 return MediaPackageElementParser.getArrayAsXml(elements);
1138 }
1139 return null;
1140 } catch (IndexOutOfBoundsException e) {
1141 throw new ServiceRegistryException("This argument list for operation '" + op + "' does not meet expectations", e);
1142 } catch (Exception e) {
1143 throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
1144 }
1145 }
1146
1147 private Set<MediaPackageElement> getElements(MediaPackage mediapackage, Set<String> elementIds)
1148 throws IllegalStateException {
1149 final Set<MediaPackageElement> elements = new HashSet<>();
1150 for (String elementId : elementIds) {
1151 final MediaPackageElement element = mediapackage.getElementById(elementId);
1152 if (element != null) {
1153 elements.add(element);
1154 } else {
1155 logger.debug("No element " + elementId + " found in media package " + mediapackage.getIdentifier());
1156 }
1157 }
1158 return elements;
1159 }
1160
1161 public File getDistributionDirectory() {
1162 return distributionDirectory;
1163 }
1164
1165 @Reference
1166 @Override
1167 public void setWorkspace(Workspace workspace) {
1168 super.setWorkspace(workspace);
1169 }
1170
1171 @Reference
1172 @Override
1173 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
1174 super.setServiceRegistry(serviceRegistry);
1175 }
1176
1177 @Reference
1178 @Override
1179 public void setSecurityService(SecurityService securityService) {
1180 super.setSecurityService(securityService);
1181 }
1182
1183 @Reference
1184 @Override
1185 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1186 super.setUserDirectoryService(userDirectoryService);
1187 }
1188
1189 @Reference
1190 @Override
1191 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
1192 super.setOrganizationDirectoryService(organizationDirectoryService);
1193 }
1194
1195 }