View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.liveschedule.impl;
22  
23  import org.opencastproject.assetmanager.api.AssetManager;
24  import org.opencastproject.assetmanager.api.Snapshot;
25  import org.opencastproject.assetmanager.api.Version;
26  import org.opencastproject.capture.admin.api.CaptureAgentStateService;
27  import org.opencastproject.distribution.api.DistributionException;
28  import org.opencastproject.distribution.api.DownloadDistributionService;
29  import org.opencastproject.job.api.Job;
30  import org.opencastproject.job.api.JobBarrier;
31  import org.opencastproject.liveschedule.api.LiveScheduleException;
32  import org.opencastproject.liveschedule.api.LiveScheduleService;
33  import org.opencastproject.mediapackage.Attachment;
34  import org.opencastproject.mediapackage.MediaPackage;
35  import org.opencastproject.mediapackage.MediaPackageElement;
36  import org.opencastproject.mediapackage.MediaPackageElementBuilder;
37  import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
38  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
39  import org.opencastproject.mediapackage.MediaPackageElementParser;
40  import org.opencastproject.mediapackage.MediaPackageElements;
41  import org.opencastproject.mediapackage.Publication;
42  import org.opencastproject.mediapackage.PublicationImpl;
43  import org.opencastproject.mediapackage.Track;
44  import org.opencastproject.mediapackage.VideoStream;
45  import org.opencastproject.mediapackage.selector.SimpleElementSelector;
46  import org.opencastproject.mediapackage.track.TrackImpl;
47  import org.opencastproject.mediapackage.track.VideoStreamImpl;
48  import org.opencastproject.metadata.dublincore.DCMIPeriod;
49  import org.opencastproject.metadata.dublincore.DublinCore;
50  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
51  import org.opencastproject.metadata.dublincore.DublinCoreCatalogService;
52  import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
53  import org.opencastproject.search.api.SearchService;
54  import org.opencastproject.security.api.AccessControlList;
55  import org.opencastproject.security.api.AclScope;
56  import org.opencastproject.security.api.AuthorizationService;
57  import org.opencastproject.security.api.Organization;
58  import org.opencastproject.security.api.OrganizationDirectoryService;
59  import org.opencastproject.security.api.SecurityService;
60  import org.opencastproject.security.api.UnauthorizedException;
61  import org.opencastproject.security.api.User;
62  import org.opencastproject.security.util.SecurityUtil;
63  import org.opencastproject.series.api.SeriesService;
64  import org.opencastproject.serviceregistry.api.ServiceRegistry;
65  import org.opencastproject.util.MimeTypes;
66  import org.opencastproject.util.NotFoundException;
67  import org.opencastproject.util.UrlSupport;
68  import org.opencastproject.workspace.api.Workspace;
69  
70  import com.google.common.cache.Cache;
71  import com.google.common.cache.CacheBuilder;
72  
73  import org.apache.commons.collections4.CollectionUtils;
74  import org.apache.commons.collections4.Equator;
75  import org.apache.commons.lang3.StringUtils;
76  import org.apache.http.client.utils.URIUtils;
77  import org.osgi.framework.BundleContext;
78  import org.osgi.service.component.ComponentContext;
79  import org.osgi.service.component.annotations.Activate;
80  import org.osgi.service.component.annotations.Component;
81  import org.osgi.service.component.annotations.Reference;
82  import org.slf4j.Logger;
83  import org.slf4j.LoggerFactory;
84  
85  import java.net.URI;
86  import java.net.URISyntaxException;
87  import java.util.ArrayList;
88  import java.util.Arrays;
89  import java.util.Collection;
90  import java.util.Dictionary;
91  import java.util.Enumeration;
92  import java.util.HashMap;
93  import java.util.HashSet;
94  import java.util.List;
95  import java.util.Map;
96  import java.util.Objects;
97  import java.util.Optional;
98  import java.util.Properties;
99  import java.util.Set;
100 import java.util.UUID;
101 import java.util.concurrent.TimeUnit;
102 import java.util.function.Predicate;
103 import java.util.regex.Matcher;
104 import java.util.regex.Pattern;
105 import java.util.stream.Collectors;
106 
107 @Component(
108     immediate = true,
109     service = LiveScheduleService.class,
110     property = {
111         "service.description=Live Schedule Service"
112     }
113 )
114 public class LiveScheduleServiceImpl implements LiveScheduleService {
115   /** The server url property **/
116   static final String SERVER_URL_PROPERTY = "org.opencastproject.server.url";
117   /** The engage base url property **/
118   static final String ENGAGE_URL_PROPERTY = "org.opencastproject.engage.ui.url";
119   /** The default path to the player **/
120   static final String PLAYER_PATH = "/play/";
121 
122   /** Default values for configuration options */
123   private static final String DEFAULT_STREAM_MIME_TYPE = "video/mp4";
124   private static final String DEFAULT_STREAM_RESOLUTION = "1920x1080";
125   private static final String DEFAULT_STREAM_NAME = "live-stream";
126   private static final String DEFAULT_LIVE_TARGET_FLAVORS = "presenter/delivery";
127   static final String DEFAULT_LIVE_DISTRIBUTION_SERVICE = "download";
128 
129   // Deactivating checkstyle to preserve the long URL
130   // CHECKSTYLE:OFF
131   // If the capture agent registered this property, we expect to get a resolution and
132   // a url in the following format:
133   // capture.device.live.resolution.WIDTHxHEIGHT=COMPLETE_STREAMING_URL e.g.
134   // capture.device.live.resolution.960x270=rtmp://cp398121.live.edgefcs.net/live/dev-epiphan005-2-presenter-delivery.stream-960x270_1_200@355694
135   public static final String CA_PROPERTY_RESOLUTION_URL_PREFIX = "capture.device.live.resolution.";
136   // CHECKSTYLE:ON
137 
138   /** Variables that can be replaced in stream name */
139   public static final String REPLACE_ID = "id";
140   public static final String REPLACE_FLAVOR = "flavor";
141   public static final String REPLACE_CA_NAME = "caName";
142   public static final String REPLACE_RESOLUTION = "resolution";
143 
144   public static final String LIVE_STREAMING_URL = "live.streamingUrl";
145   public static final String LIVE_STREAM_NAME = "live.streamName";
146   public static final String LIVE_STREAM_MIME_TYPE = "live.mimeType";
147   public static final String LIVE_STREAM_RESOLUTION = "live.resolution";
148   public static final String LIVE_TARGET_FLAVORS = "live.targetFlavors";
149   public static final String LIVE_DISTRIBUTION_SERVICE = "live.distributionService";
150   public static final String LIVE_PUBLISH_STREAMING = "live.publishStreaming";
151 
152   private static final MediaPackageElementFlavor[] publishFlavors = { MediaPackageElements.EPISODE,
153       MediaPackageElements.SERIES, MediaPackageElements.XACML_POLICY_EPISODE,
154       MediaPackageElements.XACML_POLICY_SERIES }; // make configurable later
155 
156   /** The logger */
157   private static final Logger logger = LoggerFactory.getLogger(LiveScheduleServiceImpl.class);
158 
159   private String liveStreamingUrl;
160   private String streamName;
161   private String streamMimeType;
162   private String[] streamResolution;
163   private MediaPackageElementFlavor[] liveFlavors;
164   private String serverUrl;
165   private Cache<String, Version> snapshotVersionCache
166       = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
167   /** Which streaming formats should be published automatically */
168   private List<String> publishedStreamingFormats = null;
169   private String systemUserName;
170 
171   /** Services */
172   private DownloadDistributionService downloadDistributionService; // to distribute episode and series catalogs
173   private SearchService searchService; // to publish/retract live media package
174   private SeriesService seriesService; // to get series metadata
175   private DublinCoreCatalogService dublinCoreService; // to setialize dc catalogs
176   private CaptureAgentStateService captureAgentService; // to get agent capabilities
177   private ServiceRegistry serviceRegistry; // to create publish/retract jobs
178   private Workspace workspace; // to save dc catalogs before distributing
179   private AssetManager assetManager; // to get current media package
180   private AuthorizationService authService;
181   private OrganizationDirectoryService organizationService;
182   private SecurityService securityService;
183 
184   private long jobPollingInterval = JobBarrier.DEFAULT_POLLING_INTERVAL;
185 
186   private SimpleElementSelector publishElementSelector;
187 
188   /**
189    * OSGi callback on component activation.
190    *
191    * @param context
192    *          the component context
193    */
194   @Activate
195   protected void activate(ComponentContext context) {
196     BundleContext bundleContext = context.getBundleContext();
197 
198     serverUrl = StringUtils.trimToNull(bundleContext.getProperty(SERVER_URL_PROPERTY));
199     if (serverUrl == null) {
200       logger.warn("Server url was not set in '{}'", SERVER_URL_PROPERTY);
201     } else {
202       logger.info("Server url is {}", serverUrl);
203     }
204     systemUserName = bundleContext.getProperty(SecurityUtil.PROPERTY_KEY_SYS_USER);
205 
206     @SuppressWarnings("rawtypes")
207     Dictionary properties = context.getProperties();
208     if (!StringUtils.isBlank((String) properties.get(LIVE_STREAMING_URL))) {
209       liveStreamingUrl = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAMING_URL));
210       logger.info("Live streaming server url is {}", liveStreamingUrl);
211     } else {
212       logger.info("Live streaming url not set in '{}'. Streaming urls must be provided by capture agent properties.",
213               LIVE_STREAMING_URL);
214     }
215 
216     if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_NAME))) {
217       streamName = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_NAME));
218     } else {
219       streamName = DEFAULT_STREAM_NAME;
220     }
221 
222     if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_MIME_TYPE))) {
223       streamMimeType = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_MIME_TYPE));
224     } else {
225       streamMimeType = DEFAULT_STREAM_MIME_TYPE;
226     }
227 
228     String resolution = null;
229     if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_RESOLUTION))) {
230       resolution = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_RESOLUTION));
231     } else {
232       resolution = DEFAULT_STREAM_RESOLUTION;
233     }
234     streamResolution = resolution.split(",");
235 
236     String flavors = null;
237     if (!StringUtils.isBlank((String) properties.get(LIVE_TARGET_FLAVORS))) {
238       flavors = StringUtils.trimToEmpty((String) properties.get(LIVE_TARGET_FLAVORS));
239     } else {
240       flavors = DEFAULT_LIVE_TARGET_FLAVORS;
241     }
242     String[] flavorArray = StringUtils.split(flavors, ",");
243     liveFlavors = new MediaPackageElementFlavor[flavorArray.length];
244     int i = 0;
245     for (String f : flavorArray) {
246       liveFlavors[i++] = MediaPackageElementFlavor.parseFlavor(f);
247     }
248 
249     publishedStreamingFormats = Arrays.asList(Optional.ofNullable(StringUtils.split(
250             (String)properties.get(LIVE_PUBLISH_STREAMING), ",")).orElse(new String[0]));
251 
252     publishElementSelector = new SimpleElementSelector();
253     for (MediaPackageElementFlavor flavor : publishFlavors) {
254       publishElementSelector.addFlavor(flavor);
255     }
256 
257     logger.info(
258             "Configured live stream name: {}, mime type: {}, resolution: {}, target flavors: {}",
259             streamName, streamMimeType, resolution, flavors);
260   }
261 
262   @Override
263   public boolean createOrUpdateLiveEvent(String mpId, DublinCoreCatalog episodeDC) throws LiveScheduleException {
264     MediaPackage mp = getMediaPackageFromSearch(mpId);
265     if (mp == null) {
266       // Check if capture not over. We have to check because we may get a notification for past events if
267       // the admin ui index is rebuilt
268       DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(episodeDC.getFirst(DublinCore.PROPERTY_TEMPORAL));
269       if (period.getEnd().getTime() <= System.currentTimeMillis()) {
270         logger.info("Live media package {} not created in search index because event is already past (end date: {})",
271                 mpId, period.getEnd());
272         return false;
273       }
274       return createLiveEvent(mpId, episodeDC);
275     } else {
276       // Check if the media package found in the search index is live. We have to check because we may get a
277       // notification for past events if the admin ui index is rebuilt
278       if (!mp.isLive()) {
279         logger.info("Media package {} is in search index but not live so not updating it.", mpId);
280         return false;
281       }
282       return updateLiveEvent(mp, episodeDC);
283     }
284   }
285 
286   @Override
287   public boolean deleteLiveEvent(String mpId) throws LiveScheduleException {
288     MediaPackage mp = getMediaPackageFromSearch(mpId);
289     if (mp == null) {
290       logger.debug("Live media package {} not found in search index", mpId);
291       return false;
292     } else {
293       if (!mp.isLive()) {
294         logger.info("Media package {} is not live. Not retracting.", mpId);
295         return false;
296       }
297       return retractLiveEvent(mp);
298     }
299   }
300 
301   @Override
302   public boolean updateLiveEventAcl(String mpId, AccessControlList acl) throws LiveScheduleException {
303     MediaPackage previousMp = getMediaPackageFromSearch(mpId);
304     if (previousMp != null) {
305       if (!previousMp.isLive()) {
306         logger.info("Media package {} is not live. Not updating acl.", mpId);
307         return false;
308       }
309       // Replace and distribute acl, this creates new mp
310       MediaPackage newMp = replaceAndDistributeAcl(previousMp, acl);
311       // Publish mp to engage search index
312       publishToSearch(newMp);
313       // Don't leave garbage there!
314       retractPreviousElements(previousMp, newMp);
315       logger.info("Updated live acl for media package {}", newMp);
316       return true;
317     }
318     return false;
319   }
320 
321   boolean createLiveEvent(String mpId, DublinCoreCatalog episodeDC) throws LiveScheduleException {
322     try {
323       logger.info("Creating live media package {}", mpId);
324       Snapshot snapshot = getSnapshotFromArchive(mpId);
325 
326       // generate live tracks
327       MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
328       setDurationForMediaPackage(tmpMp, episodeDC); // duration is used by live tracks
329       Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);
330 
331       // publish to search
332       MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
333       for (Track t : tmpMp.getTracks()) {
334         mpForSearch.add(t);
335       }
336       publishToSearch(mpForSearch);
337 
338       // add live publication to archive
339       MediaPackage updatedArchivedMp = addLivePublicationToMediaPackage(snapshot, liveTracks);
340       snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
341       return true;
342     } catch (Exception e) {
343       throw new LiveScheduleException(e);
344     }
345   }
346 
347   boolean updateLiveEvent(MediaPackage mpFromSearch, DublinCoreCatalog episodeDC) throws LiveScheduleException {
348     String mpId = mpFromSearch.getIdentifier().toString();
349     Snapshot snapshot = getSnapshotFromArchive(mpId);
350 
351     // If the snapshot version is in our local cache, it means that this snapshot was created by us so
352     // nothing to do. Note that this is just to save time; if the entry has already been deleted, the mp
353     // will be compared below.
354     if (snapshot.getVersion().equals(snapshotVersionCache.getIfPresent(mpId))) {
355       logger.debug("Snapshot version {} was created by us so this change is ignored.", snapshot.getVersion());
356       return false;
357     }
358 
359     // create temp mp for comparison
360     MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
361     // remove all elements that would not be published
362     Collection<MediaPackageElement> elements = publishElementSelector.select(tmpMp, false);
363     Arrays.stream(tmpMp.getElements()).filter(Predicate.not(elements::contains)).collect(Collectors.toList())
364             .forEach(tmpMp::remove);
365     // generate new live tracks
366     setDurationForMediaPackage(tmpMp, episodeDC); // duration is used by live tracks
367     Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);
368 
369     // if nothing changed, no need to do anything
370     if (isSameMediaPackage(mpFromSearch, tmpMp)) {
371       logger.debug("Live media package {} seems to be the same. Not updating.", mpFromSearch);
372       return false;
373     }
374 
375     logger.info("Updating live media package {}", mpFromSearch);
376 
377     // update mp in search
378     MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
379     for (Track t : tmpMp.getTracks()) {
380       mpForSearch.add(t);
381     }
382     removeLivePublicationChannel(mpForSearch); // we don't need the live publication in search
383     publishToSearch(mpForSearch);
384     retractPreviousElements(mpFromSearch, mpForSearch); // cleanup
385 
386     // update live publication in archive
387     MediaPackage updatedArchivedMp = updateLivePublication(snapshot.getMediaPackage(), liveTracks);
388     snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
389     return true;
390   }
391 
392   private void createOrUpdatePublicationTracks(Publication publication, Map<String, Track> generatedTracks) {
393     if (publication.getTracks().length > 0) {
394       publication.clearTracks();
395     }
396 
397     for (String publishedStreamingFormat : publishedStreamingFormats) {
398       Track track = generatedTracks.get(publishedStreamingFormat);
399       if (track != null) {
400         publication.addTrack(track);
401       }
402     }
403   }
404 
405   private MediaPackage updateLivePublication(MediaPackage mediaPackage, Map<String, Track> generatedTracks) {
406     Publication[] publications = mediaPackage.getPublications();
407     for (Publication publication : publications) {
408       if (publication.getChannel().equals(CHANNEL_ID)) {
409         createOrUpdatePublicationTracks(publication, generatedTracks);
410       }
411     }
412     return mediaPackage;
413   }
414 
415   boolean retractLiveEvent(MediaPackage mp) throws LiveScheduleException {
416     retract(mp);
417 
418     // Get latest mp from the asset manager if there to remove the publication
419     try {
420       String mpId = mp.getIdentifier().toString();
421       Snapshot snapshot = getSnapshotFromArchive(mpId);
422       MediaPackage archivedMp = snapshot.getMediaPackage();
423       removeLivePublicationChannel(archivedMp);
424       logger.debug("Removed live pub channel from archived media package {}", mp);
425       // Take a snapshot with the publication removed and put its version in our local cache
426       // so that we ignore notifications for this snapshot version.
427       snapshotVersionCache.put(mpId, assetManager.takeSnapshot(archivedMp).getVersion());
428     } catch (LiveScheduleException e) {
429       // It was not found in asset manager. This is ok.
430     }
431     return true;
432   }
433 
434   void publishToSearch(MediaPackage mp) throws LiveScheduleException {
435     try {
436       // Add media package to the search index
437       logger.info("Publishing LIVE media package {} to search index", mp);
438       Job publishJob = searchService.add(mp);
439       if (!waitForStatus(publishJob).isSuccess()) {
440         throw new LiveScheduleException("Live media package " + mp.getIdentifier() + " could not be published");
441       }
442     } catch (LiveScheduleException e) {
443       throw e;
444     } catch (Exception e) {
445       throw new LiveScheduleException(e);
446     }
447   }
448 
449   void retract(MediaPackage mp) throws LiveScheduleException {
450     Organization org = securityService.getOrganization();
451     User prevUser = org != null ? securityService.getUser() : null;
452     try {
453       securityService.setUser(SecurityUtil.createSystemUser(systemUserName, org));
454       Set<String> elementIds = new HashSet<String>();
455       String mpId = mp.getIdentifier().toString();
456       logger.info("Removing LIVE media package {} from the search index", mpId);
457 
458       for (MediaPackageElement mpe : mp.getElements()) {
459         if (!MediaPackageElement.Type.Publication.equals(mpe.getElementType())) {
460           elementIds.add(mpe.getIdentifier());
461         }
462       }
463 
464       List<String> failedJobs = new ArrayList<>();
465       // Remove media package from the search index
466       Job searchDeleteJob = searchService.delete(mpId);
467       if (!waitForStatus(searchDeleteJob).isSuccess()) {
468         failedJobs.add("Search Index");
469       }
470 
471       // Removing media from the download distribution service
472       Job distributionRetractJob =  downloadDistributionService.retract(CHANNEL_ID, mp, elementIds);
473       if (!waitForStatus(distributionRetractJob).isSuccess()) {
474         failedJobs.add("Distribution");
475       }
476 
477       if (!failedJobs.isEmpty()) {
478         throw new LiveScheduleException(
479             String.format("Removing live media package %s from %s failed", mpId, String.join(" and ", failedJobs)));
480       }
481     } catch (LiveScheduleException e) {
482       throw e;
483     } catch (Exception e) {
484       throw new LiveScheduleException(e);
485     } finally {
486       securityService.setUser(prevUser);
487     }
488   }
489 
490   /**
491    * Retrieves the media package from the search index.
492    *
493    * @param mediaPackageId
494    *          the media package id
495    * @return the media package in the search index or null if not there
496    * @throws LiveScheduleException
497    *           if found many media packages with the same id
498    */
499   MediaPackage getMediaPackageFromSearch(String mediaPackageId) throws LiveScheduleException {
500     // Issue #2504: make sure the search index is read by admin so that the media package is always found.
501     Organization org = securityService.getOrganization();
502     User prevUser = org != null ? securityService.getUser() : null;
503     securityService.setUser(SecurityUtil.createSystemUser(systemUserName, org));
504     try {
505       // Look for the media package in the search index
506       return searchService.get(mediaPackageId);
507     } catch (UnauthorizedException e) {
508       logger.warn("Unexpected unauthorized exception when querying the search index for mp {}", mediaPackageId, e);
509       return null;
510     } catch (NotFoundException e) {
511       return null;
512     } finally {
513       securityService.setUser(prevUser);
514     }
515   }
516 
517   void setDurationForMediaPackage(MediaPackage mp, DublinCoreCatalog dc) {
518     DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(dc.getFirst(DublinCore.PROPERTY_TEMPORAL));
519     long duration = period.getEnd().getTime() - period.getStart().getTime();
520     mp.setDuration(duration);
521     logger.debug("Live media package {} has start {} and duration {}", mp.getIdentifier(), mp.getDate(),
522             mp.getDuration());
523   }
524 
525   Map<String, Track> addLiveTracksToMediaPackage(MediaPackage mp, DublinCoreCatalog episodeDC)
526           throws LiveScheduleException {
527     String caName = episodeDC.getFirst(DublinCore.PROPERTY_SPATIAL);
528     HashMap<String, Track> generatedTracks = new HashMap<>();
529     String mpId = mp.getIdentifier().toString();
530     try {
531       // If capture agent registered the properties:
532       // capture.device.live.resolution.WIDTHxHEIGHT=COMPLETE_STREAMING_URL, use them!
533       try {
534         Properties caProps = captureAgentService.getAgentCapabilities(caName);
535         if (caProps != null) {
536           Enumeration<Object> en = caProps.keys();
537           while (en.hasMoreElements()) {
538             String key = (String) en.nextElement();
539             if (key.startsWith(CA_PROPERTY_RESOLUTION_URL_PREFIX)) {
540               String resolution = key.substring(CA_PROPERTY_RESOLUTION_URL_PREFIX.length());
541               String url = caProps.getProperty(key);
542               // Note: only one flavor is supported in this format (the default: presenter/delivery)
543               MediaPackageElementFlavor flavor = MediaPackageElementFlavor.parseFlavor(DEFAULT_LIVE_TARGET_FLAVORS);
544               String replacedUrl = replaceVariables(mpId, caName, url, flavor, resolution);
545               mp.add(buildStreamingTrack(replacedUrl, flavor, streamMimeType, resolution, mp.getDuration()));
546             }
547           }
548         }
549       } catch (NotFoundException e) {
550         // Capture agent not found so we can't get its properties. Assume the service configuration should
551         // be used instead. Note that we can't schedule anything on a CA that has not registered so this is
552         // unlikely to happen.
553       }
554 
555       // Capture agent did not pass any CA_PROPERTY_RESOLUTION_URL_PREFIX property when registering
556       // so use the service configuration
557       if (mp.getTracks().length == 0) {
558         if (liveStreamingUrl == null) {
559           throw new LiveScheduleException(
560                   "Cannot build live tracks because '" + LIVE_STREAMING_URL + "' configuration was not set.");
561         }
562 
563         for (MediaPackageElementFlavor flavor : liveFlavors) {
564           for (int i = 0; i < streamResolution.length; i++) {
565             String uri = replaceVariables(mpId, caName, UrlSupport.concat(liveStreamingUrl.toString(), streamName),
566                     flavor, streamResolution[i]);
567             Track track = buildStreamingTrack(uri, flavor, streamMimeType, streamResolution[i], mp.getDuration());
568             mp.add(track);
569             generatedTracks.put(flavor + ":" + streamResolution[i], track);
570           }
571         }
572       }
573     } catch (URISyntaxException e) {
574       throw new LiveScheduleException(e);
575     }
576     return generatedTracks;
577   }
578 
579   Track buildStreamingTrack(String uriString, MediaPackageElementFlavor flavor, String mimeType, String resolution,
580           long duration) throws URISyntaxException {
581 
582     URI uri = new URI(uriString);
583 
584     MediaPackageElementBuilder elementBuilder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
585     MediaPackageElement element = elementBuilder.elementFromURI(uri, MediaPackageElement.Type.Track, flavor);
586     TrackImpl track = (TrackImpl) element;
587 
588     // Set duration and mime type
589     track.setDuration(duration);
590     track.setLive(true);
591     track.setMimeType(MimeTypes.parseMimeType(mimeType));
592 
593     VideoStreamImpl video = new VideoStreamImpl("video-" + flavor.getType() + "-" + flavor.getSubtype());
594     // Set video resolution
595     String[] dimensions = resolution.split("x");
596     video.setFrameWidth(Integer.parseInt(dimensions[0]));
597     video.setFrameHeight(Integer.parseInt(dimensions[1]));
598 
599     track.addStream(video);
600 
601     logger.debug("Creating live track element of flavor {}, resolution {}, and url {}",
602             new Object[] { flavor, resolution, uriString });
603 
604     return track;
605   }
606 
607   /**
608    * Replaces variables in the live stream name. Currently, this is only prepared to handle the following: #{id} = media
609    * package id, #{flavor} = type-subtype of flavor, #{caName} = capture agent name, #{resolution} = stream resolution
610    */
611   String replaceVariables(String mpId, String caName, String toBeReplaced, MediaPackageElementFlavor flavor,
612           String resolution) {
613 
614     // Substitution pattern: any string in the form #{name}, where 'name' has only word characters: [a-zA-Z_0-9].
615     final Pattern pat = Pattern.compile("#\\{(\\w+)\\}");
616 
617     Matcher matcher = pat.matcher(toBeReplaced);
618     StringBuffer sb = new StringBuffer();
619     while (matcher.find()) {
620       if (matcher.group(1).equals(REPLACE_ID)) {
621         matcher.appendReplacement(sb, mpId);
622       } else if (matcher.group(1).equals(REPLACE_FLAVOR)) {
623         matcher.appendReplacement(sb, flavor.getType() + "-" + flavor.getSubtype());
624       } else if (matcher.group(1).equals(REPLACE_CA_NAME)) {
625         // Taking the easy route to find the capture agent name...
626         matcher.appendReplacement(sb, caName);
627       } else if (matcher.group(1).equals(REPLACE_RESOLUTION)) {
628         // Taking the easy route to find the capture agent name...
629         matcher.appendReplacement(sb, resolution);
630       } // else will not replace
631     }
632     matcher.appendTail(sb);
633     return sb.toString();
634   }
635 
636   private JobBarrier.Result waitForStatus(Job... jobs) throws IllegalStateException, IllegalArgumentException {
637     if (serviceRegistry == null) {
638       throw new IllegalStateException("Can't wait for job status without providing a service registry first");
639     }
640     JobBarrier barrier = new JobBarrier(null, serviceRegistry, jobPollingInterval, jobs);
641     return barrier.waitForJobs();
642   }
643 
644   Snapshot getSnapshotFromArchive(String mpId) throws LiveScheduleException {
645     Optional<Snapshot> snapshot = assetManager.getLatestSnapshot(mpId);
646     if (snapshot.isEmpty()) {
647       // No snapshot?
648       throw new LiveScheduleException(String.format("Unexpected error: media package %s has not been archived.", mpId));
649     }
650     return snapshot.get();
651   }
652 
653   MediaPackage distributeAclsAndCatalogs(Snapshot snapshot) throws LiveScheduleException {
654     try {
655       MediaPackage mp = (MediaPackage) snapshot.getMediaPackage().clone();
656 
657       // Select elements
658       Collection<MediaPackageElement> elements = publishElementSelector.select(mp, false);
659       Set<String> elementIds = elements.stream().map(MediaPackageElement::getIdentifier).collect(Collectors.toSet());
660 
661       // Distribute elements
662       Job distributionJob = downloadDistributionService.distribute(CHANNEL_ID, mp, elementIds, false);
663       if (!waitForStatus(distributionJob).isSuccess()) {
664         throw new LiveScheduleException(
665                 "Element(s) for live media package " + mp.getIdentifier() + " could not be distributed");
666       }
667 
668       // Remove all elements from mp
669       for (MediaPackageElement e: mp.getElements()) {
670         mp.remove(e);
671       }
672 
673       // Re-add distributed elements
674       List<MediaPackageElement> distributedElements = (List<MediaPackageElement>) MediaPackageElementParser
675               .getArrayFromXml(distributionJob.getPayload());
676       for (MediaPackageElement distributedElement : distributedElements) {
677         mp.add(distributedElement);
678       }
679 
680       // Clean up
681       for (String id : elementIds) {
682         MediaPackageElement e = mp.getElementById(id);
683         workspace.delete(e.getURI());
684       }
685 
686       return mp;
687     } catch (LiveScheduleException e) {
688       throw e;
689     } catch (Exception e) {
690       throw new LiveScheduleException(e);
691     }
692   }
693 
694   MediaPackage replaceAndDistributeAcl(MediaPackage previousMp, AccessControlList acl) throws LiveScheduleException {
695     try {
696       // This is the mp from the search index
697       MediaPackage mp = (MediaPackage) previousMp.clone();
698 
699       // Remove previous Acl from the mp
700       Attachment[] atts = mp.getAttachments(MediaPackageElements.XACML_POLICY_EPISODE);
701       if (atts.length > 0) {
702         mp.remove(atts[0]);
703       }
704 
705       // Attach current ACL to mp, acl will be created in the ws/wfr
706       authService.setAcl(mp, AclScope.Episode, acl);
707       atts = mp.getAttachments(MediaPackageElements.XACML_POLICY_EPISODE);
708       if (atts.length > 0) {
709         String aclId = atts[0].getIdentifier();
710         // Distribute new acl
711         Job distributionJob = downloadDistributionService.distribute(CHANNEL_ID, mp, aclId, false);
712         if (!waitForStatus(distributionJob).isSuccess()) {
713           throw new LiveScheduleException(
714                   "Acl for live media package " + mp.getIdentifier() + " could not be distributed");
715         }
716 
717         MediaPackageElement e = mp.getElementById(aclId);
718         // Cleanup workspace/wfr
719         mp.remove(e);
720         workspace.delete(e.getURI());
721 
722         // Add distributed acl to mp
723         mp.add(MediaPackageElementParser.getFromXml(distributionJob.getPayload()));
724       }
725       return mp;
726     } catch (LiveScheduleException e) {
727       throw e;
728     } catch (Exception e) {
729       throw new LiveScheduleException(e);
730     }
731   }
732 
733   MediaPackage addLivePublicationToMediaPackage(Snapshot snapshot, Map<String, Track> generatedTracks)
734           throws LiveScheduleException {
735     MediaPackage mp = snapshot.getMediaPackage();
736 
737     Organization currentOrg = null;
738     try {
739       currentOrg = organizationService.getOrganization(snapshot.getOrganizationId());
740     } catch (NotFoundException e) {
741       logger.warn("Organization in snapshot not found: {}", snapshot.getOrganizationId());
742     }
743 
744     logger.debug("Adding live channel publication element to media package {}", mp);
745     String engageUrlString = null;
746     if (currentOrg != null) {
747       engageUrlString = StringUtils.trimToNull(currentOrg.getProperties().get(ENGAGE_URL_PROPERTY));
748     }
749     if (engageUrlString == null) {
750       engageUrlString = serverUrl;
751       logger.info(
752           "Using 'server.url' as a fallback for the non-existing organization level key '{}' for the publication url",
753           ENGAGE_URL_PROPERTY);
754     }
755 
756     try {
757       // Create new distribution element
758       URI engageUri = URIUtils.resolve(new URI(engageUrlString), PLAYER_PATH + mp.getIdentifier().toString());
759       Publication publicationElement = PublicationImpl.publication(UUID.randomUUID().toString(), CHANNEL_ID, engageUri,
760               MimeTypes.parseMimeType("text/html"));
761       mp.add(publicationElement);
762       createOrUpdatePublicationTracks(publicationElement, generatedTracks);
763       return mp;
764     } catch (URISyntaxException e) {
765       throw new LiveScheduleException(e);
766     }
767   }
768 
769   void removeLivePublicationChannel(MediaPackage mp) {
770     // Remove publication element
771     Publication[] publications = mp.getPublications();
772     if (publications != null) {
773       for (Publication publication : publications) {
774         if (CHANNEL_ID.equals(publication.getChannel())) {
775           mp.remove(publication);
776         }
777       }
778     }
779   }
780 
781   boolean isSameMediaPackage(MediaPackage previous, MediaPackage current) {
782 
783     Equator<Track> liveTrackEquator = new Equator<>() {
784       @Override
785       public boolean equate(Track track1, Track track2) {
786         // we can safely assume that each live track has exactly one video stream since we generated that ourselves
787         VideoStream videostream1 = (VideoStream) track1.getStreams()[0];
788         VideoStream videostream2 = (VideoStream) track2.getStreams()[0];
789 
790         return Objects.equals(track1.getURI(), track2.getURI())
791                 && Objects.equals(track1.getFlavor(), track2.getFlavor())
792                 && Objects.equals(track1.getMimeType(), track2.getMimeType())
793                 && Objects.equals(track1.getDuration(), track2.getDuration())
794                 && Objects.equals(videostream1.getFrameWidth(), videostream2.getFrameWidth())
795                 && Objects.equals(videostream1.getFrameHeight(), videostream2.getFrameHeight());
796       }
797 
798       @Override
799       public int hash(Track track) {
800         VideoStream videostream = (VideoStream) track.getStreams()[0];
801         return Objects.hash(track.getURI(), track.getFlavor(), track.getMimeType(), track.getDuration(),
802                 videostream.getFrameWidth(), videostream.getFrameHeight());
803       }
804     };
805 
806     Equator<MediaPackageElement> catalogAndAttachmentEquator = new Equator<>() {
807       @Override
808       public boolean equate(MediaPackageElement mpe1, MediaPackageElement mpe2) {
809         return Objects.equals(mpe1.getIdentifier(), mpe2.getIdentifier())
810                 && Objects.equals(mpe1.getElementType(), mpe2.getElementType())
811                 && Objects.equals(mpe1.getChecksum(), mpe2.getChecksum())
812                 && Objects.equals(mpe1.getFlavor(), mpe2.getFlavor());
813       }
814 
815       @Override
816       public int hash(MediaPackageElement mpe) {
817         return Objects.hash(mpe.getIdentifier(), mpe.getElementType(), mpe.getChecksum(), mpe.getFlavor());
818       }
819     };
820 
821     if (!CollectionUtils.isEqualCollection(Arrays.asList(previous.getTracks()), Arrays.asList(current.getTracks()),
822             liveTrackEquator)) {
823       return false;
824     } else if (!CollectionUtils.isEqualCollection(Arrays.asList(previous.getCatalogs()),
825             Arrays.asList(current.getCatalogs()), catalogAndAttachmentEquator)) {
826       return false;
827     } else {
828       return CollectionUtils.isEqualCollection(Arrays.asList(previous.getAttachments()),
829               Arrays.asList(current.getAttachments()), catalogAndAttachmentEquator);
830     }
831   }
832 
833   void retractPreviousElements(MediaPackage previousMp, MediaPackage newMp) throws LiveScheduleException {
834     try {
835       // Now can retract elements from previous publish. Before creating a retraction
836       // job, check if the element url is still used by the new media package.
837       Set<String> elementIds = new HashSet<String>();
838       for (MediaPackageElement element : previousMp.getElements()) {
839         // We don't retract tracks because they are just live links
840         if (!Track.TYPE.equals(element.getElementType())) {
841           boolean canBeDeleted = true;
842           for (MediaPackageElement newElement : newMp.getElements()) {
843             if (element.getURI().equals(newElement.getURI())) {
844               logger.debug(
845                   "Not retracting element {} with URI {} from download distribution because it is "
846                       + "still used by updated live media package",
847                   element.getIdentifier(), element.getURI());
848               canBeDeleted = false;
849               break;
850             }
851           }
852           if (canBeDeleted) {
853             elementIds.add(element.getIdentifier());
854           }
855         }
856       }
857       if (elementIds.size() > 0) {
858         Job job = downloadDistributionService.retract(CHANNEL_ID, previousMp, elementIds);
859         // Wait for retraction to finish
860         if (!waitForStatus(job).isSuccess()) {
861           logger.warn("One of the download retract jobs did not complete successfully");
862         } else {
863           logger.debug("Retraction of previously published elements complete");
864         }
865       }
866     } catch (DistributionException e) {
867       throw new LiveScheduleException(e);
868     }
869   }
870 
871   @Reference
872   public void setDublinCoreService(DublinCoreCatalogService service) {
873     this.dublinCoreService = service;
874   }
875 
876   @Reference
877   public void setSearchService(SearchService service) {
878     this.searchService = service;
879   }
880 
881   @Reference
882   public void setSeriesService(SeriesService service) {
883     this.seriesService = service;
884   }
885 
886   @Reference
887   public void setServiceRegistry(ServiceRegistry service) {
888     this.serviceRegistry = service;
889   }
890 
891   @Reference
892   public void setCaptureAgentService(CaptureAgentStateService service) {
893     this.captureAgentService = service;
894   }
895 
896   @Reference(
897       name = "DownloadDistributionService",
898       target = "(distribution.channel=download)"
899   )
900   public void setDownloadDistributionService(DownloadDistributionService service) {
901     this.downloadDistributionService = service;
902     logger.info("Distribution service with type '{}' set.", downloadDistributionService.getDistributionType());
903   }
904 
905   @Reference
906   public void setWorkspace(Workspace ws) {
907     this.workspace = ws;
908   }
909 
910   @Reference
911   public void setAssetManager(AssetManager assetManager) {
912     this.assetManager = assetManager;
913   }
914 
915   @Reference
916   public void setAuthorizationService(AuthorizationService service) {
917     this.authService = service;
918   }
919 
920   @Reference
921   public void setOrganizationService(OrganizationDirectoryService service) {
922     this.organizationService = service;
923   }
924 
925   @Reference
926   public void setSecurityService(SecurityService service) {
927     this.securityService = service;
928   }
929   // === Set by OSGI - end
930 
931   // === Used by unit tests - begin
932   void setJobPollingInterval(long jobPollingInterval) {
933     this.jobPollingInterval = jobPollingInterval;
934   }
935 
936   Cache<String, Version> getSnapshotVersionCache() {
937     return this.snapshotVersionCache;
938   }
939   // === Used by unit tests - end
940 }