View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.liveschedule.impl;
22  
23  import org.opencastproject.assetmanager.api.AssetManager;
24  import org.opencastproject.assetmanager.api.Snapshot;
25  import org.opencastproject.assetmanager.api.Version;
26  import org.opencastproject.assetmanager.api.query.AQueryBuilder;
27  import org.opencastproject.assetmanager.api.query.ARecord;
28  import org.opencastproject.assetmanager.api.query.AResult;
29  import org.opencastproject.capture.admin.api.CaptureAgentStateService;
30  import org.opencastproject.distribution.api.DistributionException;
31  import org.opencastproject.distribution.api.DownloadDistributionService;
32  import org.opencastproject.job.api.Job;
33  import org.opencastproject.job.api.JobBarrier;
34  import org.opencastproject.liveschedule.api.LiveScheduleException;
35  import org.opencastproject.liveschedule.api.LiveScheduleService;
36  import org.opencastproject.mediapackage.Attachment;
37  import org.opencastproject.mediapackage.MediaPackage;
38  import org.opencastproject.mediapackage.MediaPackageElement;
39  import org.opencastproject.mediapackage.MediaPackageElementBuilder;
40  import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
41  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
42  import org.opencastproject.mediapackage.MediaPackageElementParser;
43  import org.opencastproject.mediapackage.MediaPackageElements;
44  import org.opencastproject.mediapackage.Publication;
45  import org.opencastproject.mediapackage.PublicationImpl;
46  import org.opencastproject.mediapackage.Track;
47  import org.opencastproject.mediapackage.VideoStream;
48  import org.opencastproject.mediapackage.selector.SimpleElementSelector;
49  import org.opencastproject.mediapackage.track.TrackImpl;
50  import org.opencastproject.mediapackage.track.VideoStreamImpl;
51  import org.opencastproject.metadata.dublincore.DCMIPeriod;
52  import org.opencastproject.metadata.dublincore.DublinCore;
53  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
54  import org.opencastproject.metadata.dublincore.DublinCoreCatalogService;
55  import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
56  import org.opencastproject.search.api.SearchService;
57  import org.opencastproject.security.api.AccessControlList;
58  import org.opencastproject.security.api.AclScope;
59  import org.opencastproject.security.api.AuthorizationService;
60  import org.opencastproject.security.api.Organization;
61  import org.opencastproject.security.api.OrganizationDirectoryService;
62  import org.opencastproject.security.api.SecurityService;
63  import org.opencastproject.security.api.UnauthorizedException;
64  import org.opencastproject.security.api.User;
65  import org.opencastproject.security.util.SecurityUtil;
66  import org.opencastproject.series.api.SeriesService;
67  import org.opencastproject.serviceregistry.api.ServiceRegistry;
68  import org.opencastproject.util.MimeTypes;
69  import org.opencastproject.util.NotFoundException;
70  import org.opencastproject.util.UrlSupport;
71  import org.opencastproject.workspace.api.Workspace;
72  
73  import com.google.common.cache.Cache;
74  import com.google.common.cache.CacheBuilder;
75  
76  import org.apache.commons.collections4.CollectionUtils;
77  import org.apache.commons.collections4.Equator;
78  import org.apache.commons.lang3.StringUtils;
79  import org.apache.http.client.utils.URIUtils;
80  import org.osgi.framework.BundleContext;
81  import org.osgi.service.component.ComponentContext;
82  import org.osgi.service.component.annotations.Activate;
83  import org.osgi.service.component.annotations.Component;
84  import org.osgi.service.component.annotations.Reference;
85  import org.slf4j.Logger;
86  import org.slf4j.LoggerFactory;
87  
88  import java.net.URI;
89  import java.net.URISyntaxException;
90  import java.util.ArrayList;
91  import java.util.Arrays;
92  import java.util.Collection;
93  import java.util.Dictionary;
94  import java.util.Enumeration;
95  import java.util.HashMap;
96  import java.util.HashSet;
97  import java.util.List;
98  import java.util.Map;
99  import java.util.Objects;
100 import java.util.Optional;
101 import java.util.Properties;
102 import java.util.Set;
103 import java.util.UUID;
104 import java.util.concurrent.TimeUnit;
105 import java.util.function.Predicate;
106 import java.util.regex.Matcher;
107 import java.util.regex.Pattern;
108 import java.util.stream.Collectors;
109 
110 @Component(
111     immediate = true,
112     service = LiveScheduleService.class,
113     property = {
114         "service.description=Live Schedule Service"
115     }
116 )
117 public class LiveScheduleServiceImpl implements LiveScheduleService {
118   /** The server url property **/
119   static final String SERVER_URL_PROPERTY = "org.opencastproject.server.url";
120   /** The engage base url property **/
121   static final String ENGAGE_URL_PROPERTY = "org.opencastproject.engage.ui.url";
122   /** The default path to the player **/
123   static final String PLAYER_PATH = "/play/";
124 
125   /** Default values for configuration options */
126   private static final String DEFAULT_STREAM_MIME_TYPE = "video/mp4";
127   private static final String DEFAULT_STREAM_RESOLUTION = "1920x1080";
128   private static final String DEFAULT_STREAM_NAME = "live-stream";
129   private static final String DEFAULT_LIVE_TARGET_FLAVORS = "presenter/delivery";
130   static final String DEFAULT_LIVE_DISTRIBUTION_SERVICE = "download";
131 
132   // Deactivating checkstyle to preserve the long URL
133   // CHECKSTYLE:OFF
134   // If the capture agent registered this property, we expect to get a resolution and
135   // a url in the following format:
136   // capture.device.live.resolution.WIDTHxHEIGHT=COMPLETE_STREAMING_URL e.g.
137   // capture.device.live.resolution.960x270=rtmp://cp398121.live.edgefcs.net/live/dev-epiphan005-2-presenter-delivery.stream-960x270_1_200@355694
138   public static final String CA_PROPERTY_RESOLUTION_URL_PREFIX = "capture.device.live.resolution.";
139   // CHECKSTYLE:ON
140 
141   /** Variables that can be replaced in stream name */
142   public static final String REPLACE_ID = "id";
143   public static final String REPLACE_FLAVOR = "flavor";
144   public static final String REPLACE_CA_NAME = "caName";
145   public static final String REPLACE_RESOLUTION = "resolution";
146 
147   public static final String LIVE_STREAMING_URL = "live.streamingUrl";
148   public static final String LIVE_STREAM_NAME = "live.streamName";
149   public static final String LIVE_STREAM_MIME_TYPE = "live.mimeType";
150   public static final String LIVE_STREAM_RESOLUTION = "live.resolution";
151   public static final String LIVE_TARGET_FLAVORS = "live.targetFlavors";
152   public static final String LIVE_DISTRIBUTION_SERVICE = "live.distributionService";
153   public static final String LIVE_PUBLISH_STREAMING = "live.publishStreaming";
154 
155   private static final MediaPackageElementFlavor[] publishFlavors = { MediaPackageElements.EPISODE,
156       MediaPackageElements.SERIES, MediaPackageElements.XACML_POLICY_EPISODE,
157       MediaPackageElements.XACML_POLICY_SERIES }; // make configurable later
158 
159   /** The logger */
160   private static final Logger logger = LoggerFactory.getLogger(LiveScheduleServiceImpl.class);
161 
162   private String liveStreamingUrl;
163   private String streamName;
164   private String streamMimeType;
165   private String[] streamResolution;
166   private MediaPackageElementFlavor[] liveFlavors;
167   private String serverUrl;
168   private Cache<String, Version> snapshotVersionCache
169       = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
170   /** Which streaming formats should be published automatically */
171   private List<String> publishedStreamingFormats = null;
172   private String systemUserName;
173 
174   /** Services */
175   private DownloadDistributionService downloadDistributionService; // to distribute episode and series catalogs
176   private SearchService searchService; // to publish/retract live media package
177   private SeriesService seriesService; // to get series metadata
178   private DublinCoreCatalogService dublinCoreService; // to setialize dc catalogs
179   private CaptureAgentStateService captureAgentService; // to get agent capabilities
180   private ServiceRegistry serviceRegistry; // to create publish/retract jobs
181   private Workspace workspace; // to save dc catalogs before distributing
182   private AssetManager assetManager; // to get current media package
183   private AuthorizationService authService;
184   private OrganizationDirectoryService organizationService;
185   private SecurityService securityService;
186 
187   private long jobPollingInterval = JobBarrier.DEFAULT_POLLING_INTERVAL;
188 
189   private SimpleElementSelector publishElementSelector;
190 
191   /**
192    * OSGi callback on component activation.
193    *
194    * @param context
195    *          the component context
196    */
197   @Activate
198   protected void activate(ComponentContext context) {
199     BundleContext bundleContext = context.getBundleContext();
200 
201     serverUrl = StringUtils.trimToNull(bundleContext.getProperty(SERVER_URL_PROPERTY));
202     if (serverUrl == null) {
203       logger.warn("Server url was not set in '{}'", SERVER_URL_PROPERTY);
204     } else {
205       logger.info("Server url is {}", serverUrl);
206     }
207     systemUserName = bundleContext.getProperty(SecurityUtil.PROPERTY_KEY_SYS_USER);
208 
209     @SuppressWarnings("rawtypes")
210     Dictionary properties = context.getProperties();
211     if (!StringUtils.isBlank((String) properties.get(LIVE_STREAMING_URL))) {
212       liveStreamingUrl = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAMING_URL));
213       logger.info("Live streaming server url is {}", liveStreamingUrl);
214     } else {
215       logger.info("Live streaming url not set in '{}'. Streaming urls must be provided by capture agent properties.",
216               LIVE_STREAMING_URL);
217     }
218 
219     if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_NAME))) {
220       streamName = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_NAME));
221     } else {
222       streamName = DEFAULT_STREAM_NAME;
223     }
224 
225     if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_MIME_TYPE))) {
226       streamMimeType = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_MIME_TYPE));
227     } else {
228       streamMimeType = DEFAULT_STREAM_MIME_TYPE;
229     }
230 
231     String resolution = null;
232     if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_RESOLUTION))) {
233       resolution = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_RESOLUTION));
234     } else {
235       resolution = DEFAULT_STREAM_RESOLUTION;
236     }
237     streamResolution = resolution.split(",");
238 
239     String flavors = null;
240     if (!StringUtils.isBlank((String) properties.get(LIVE_TARGET_FLAVORS))) {
241       flavors = StringUtils.trimToEmpty((String) properties.get(LIVE_TARGET_FLAVORS));
242     } else {
243       flavors = DEFAULT_LIVE_TARGET_FLAVORS;
244     }
245     String[] flavorArray = StringUtils.split(flavors, ",");
246     liveFlavors = new MediaPackageElementFlavor[flavorArray.length];
247     int i = 0;
248     for (String f : flavorArray) {
249       liveFlavors[i++] = MediaPackageElementFlavor.parseFlavor(f);
250     }
251 
252     publishedStreamingFormats = Arrays.asList(Optional.ofNullable(StringUtils.split(
253             (String)properties.get(LIVE_PUBLISH_STREAMING), ",")).orElse(new String[0]));
254 
255     publishElementSelector = new SimpleElementSelector();
256     for (MediaPackageElementFlavor flavor : publishFlavors) {
257       publishElementSelector.addFlavor(flavor);
258     }
259 
260     logger.info(
261             "Configured live stream name: {}, mime type: {}, resolution: {}, target flavors: {}",
262             streamName, streamMimeType, resolution, flavors);
263   }
264 
265   @Override
266   public boolean createOrUpdateLiveEvent(String mpId, DublinCoreCatalog episodeDC) throws LiveScheduleException {
267     MediaPackage mp = getMediaPackageFromSearch(mpId);
268     if (mp == null) {
269       // Check if capture not over. We have to check because we may get a notification for past events if
270       // the admin ui index is rebuilt
271       DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(episodeDC.getFirst(DublinCore.PROPERTY_TEMPORAL));
272       if (period.getEnd().getTime() <= System.currentTimeMillis()) {
273         logger.info("Live media package {} not created in search index because event is already past (end date: {})",
274                 mpId, period.getEnd());
275         return false;
276       }
277       return createLiveEvent(mpId, episodeDC);
278     } else {
279       // Check if the media package found in the search index is live. We have to check because we may get a
280       // notification for past events if the admin ui index is rebuilt
281       if (!mp.isLive()) {
282         logger.info("Media package {} is in search index but not live so not updating it.", mpId);
283         return false;
284       }
285       return updateLiveEvent(mp, episodeDC);
286     }
287   }
288 
289   @Override
290   public boolean deleteLiveEvent(String mpId) throws LiveScheduleException {
291     MediaPackage mp = getMediaPackageFromSearch(mpId);
292     if (mp == null) {
293       logger.debug("Live media package {} not found in search index", mpId);
294       return false;
295     } else {
296       if (!mp.isLive()) {
297         logger.info("Media package {} is not live. Not retracting.", mpId);
298         return false;
299       }
300       return retractLiveEvent(mp);
301     }
302   }
303 
304   @Override
305   public boolean updateLiveEventAcl(String mpId, AccessControlList acl) throws LiveScheduleException {
306     MediaPackage previousMp = getMediaPackageFromSearch(mpId);
307     if (previousMp != null) {
308       if (!previousMp.isLive()) {
309         logger.info("Media package {} is not live. Not updating acl.", mpId);
310         return false;
311       }
312       // Replace and distribute acl, this creates new mp
313       MediaPackage newMp = replaceAndDistributeAcl(previousMp, acl);
314       // Publish mp to engage search index
315       publishToSearch(newMp);
316       // Don't leave garbage there!
317       retractPreviousElements(previousMp, newMp);
318       logger.info("Updated live acl for media package {}", newMp);
319       return true;
320     }
321     return false;
322   }
323 
324   boolean createLiveEvent(String mpId, DublinCoreCatalog episodeDC) throws LiveScheduleException {
325     try {
326       logger.info("Creating live media package {}", mpId);
327       Snapshot snapshot = getSnapshotFromArchive(mpId);
328 
329       // generate live tracks
330       MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
331       setDurationForMediaPackage(tmpMp, episodeDC); // duration is used by live tracks
332       Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);
333 
334       // publish to search
335       MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
336       for (Track t : tmpMp.getTracks()) {
337         mpForSearch.add(t);
338       }
339       publishToSearch(mpForSearch);
340 
341       // add live publication to archive
342       MediaPackage updatedArchivedMp = addLivePublicationToMediaPackage(snapshot, liveTracks);
343       snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
344       return true;
345     } catch (Exception e) {
346       throw new LiveScheduleException(e);
347     }
348   }
349 
350   boolean updateLiveEvent(MediaPackage mpFromSearch, DublinCoreCatalog episodeDC) throws LiveScheduleException {
351     String mpId = mpFromSearch.getIdentifier().toString();
352     Snapshot snapshot = getSnapshotFromArchive(mpId);
353 
354     // If the snapshot version is in our local cache, it means that this snapshot was created by us so
355     // nothing to do. Note that this is just to save time; if the entry has already been deleted, the mp
356     // will be compared below.
357     if (snapshot.getVersion().equals(snapshotVersionCache.getIfPresent(mpId))) {
358       logger.debug("Snapshot version {} was created by us so this change is ignored.", snapshot.getVersion());
359       return false;
360     }
361 
362     // create temp mp for comparison
363     MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
364     // remove all elements that would not be published
365     Collection<MediaPackageElement> elements = publishElementSelector.select(tmpMp, false);
366     Arrays.stream(tmpMp.getElements()).filter(Predicate.not(elements::contains)).collect(Collectors.toList())
367             .forEach(tmpMp::remove);
368     // generate new live tracks
369     setDurationForMediaPackage(tmpMp, episodeDC); // duration is used by live tracks
370     Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);
371 
372     // if nothing changed, no need to do anything
373     if (isSameMediaPackage(mpFromSearch, tmpMp)) {
374       logger.debug("Live media package {} seems to be the same. Not updating.", mpFromSearch);
375       return false;
376     }
377 
378     logger.info("Updating live media package {}", mpFromSearch);
379 
380     // update mp in search
381     MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
382     for (Track t : tmpMp.getTracks()) {
383       mpForSearch.add(t);
384     }
385     removeLivePublicationChannel(mpForSearch); // we don't need the live publication in search
386     publishToSearch(mpForSearch);
387     retractPreviousElements(mpFromSearch, mpForSearch); // cleanup
388 
389     // update live publication in archive
390     MediaPackage updatedArchivedMp = updateLivePublication(snapshot.getMediaPackage(), liveTracks);
391     snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
392     return true;
393   }
394 
395   private void createOrUpdatePublicationTracks(Publication publication, Map<String, Track> generatedTracks) {
396     if (publication.getTracks().length > 0) {
397       publication.clearTracks();
398     }
399 
400     for (String publishedStreamingFormat : publishedStreamingFormats) {
401       Track track = generatedTracks.get(publishedStreamingFormat);
402       if (track != null) {
403         publication.addTrack(track);
404       }
405     }
406   }
407 
408   private MediaPackage updateLivePublication(MediaPackage mediaPackage, Map<String, Track> generatedTracks) {
409     Publication[] publications = mediaPackage.getPublications();
410     for (Publication publication : publications) {
411       if (publication.getChannel().equals(CHANNEL_ID)) {
412         createOrUpdatePublicationTracks(publication, generatedTracks);
413       }
414     }
415     return mediaPackage;
416   }
417 
418   boolean retractLiveEvent(MediaPackage mp) throws LiveScheduleException {
419     retract(mp);
420 
421     // Get latest mp from the asset manager if there to remove the publication
422     try {
423       String mpId = mp.getIdentifier().toString();
424       Snapshot snapshot = getSnapshotFromArchive(mpId);
425       MediaPackage archivedMp = snapshot.getMediaPackage();
426       removeLivePublicationChannel(archivedMp);
427       logger.debug("Removed live pub channel from archived media package {}", mp);
428       // Take a snapshot with the publication removed and put its version in our local cache
429       // so that we ignore notifications for this snapshot version.
430       snapshotVersionCache.put(mpId, assetManager.takeSnapshot(archivedMp).getVersion());
431     } catch (LiveScheduleException e) {
432       // It was not found in asset manager. This is ok.
433     }
434     return true;
435   }
436 
437   void publishToSearch(MediaPackage mp) throws LiveScheduleException {
438     try {
439       // Add media package to the search index
440       logger.info("Publishing LIVE media package {} to search index", mp);
441       Job publishJob = searchService.add(mp);
442       if (!waitForStatus(publishJob).isSuccess()) {
443         throw new LiveScheduleException("Live media package " + mp.getIdentifier() + " could not be published");
444       }
445     } catch (LiveScheduleException e) {
446       throw e;
447     } catch (Exception e) {
448       throw new LiveScheduleException(e);
449     }
450   }
451 
452   void retract(MediaPackage mp) throws LiveScheduleException {
453     Organization org = securityService.getOrganization();
454     User prevUser = org != null ? securityService.getUser() : null;
455     try {
456       securityService.setUser(SecurityUtil.createSystemUser(systemUserName, org));
457       Set<String> elementIds = new HashSet<String>();
458       String mpId = mp.getIdentifier().toString();
459       logger.info("Removing LIVE media package {} from the search index", mpId);
460 
461       for (MediaPackageElement mpe : mp.getElements()) {
462         if (!MediaPackageElement.Type.Publication.equals(mpe.getElementType())) {
463           elementIds.add(mpe.getIdentifier());
464         }
465       }
466 
467       List<String> failedJobs = new ArrayList<>();
468       // Remove media package from the search index
469       Job searchDeleteJob = searchService.delete(mpId);
470       if (!waitForStatus(searchDeleteJob).isSuccess()) {
471         failedJobs.add("Search Index");
472       }
473 
474       // Removing media from the download distribution service
475       Job distributionRetractJob =  downloadDistributionService.retract(CHANNEL_ID, mp, elementIds);
476       if (!waitForStatus(distributionRetractJob).isSuccess()) {
477         failedJobs.add("Distribution");
478       }
479 
480       if (!failedJobs.isEmpty()) {
481         throw new LiveScheduleException(
482             String.format("Removing live media package %s from %s failed", mpId, String.join(" and ", failedJobs)));
483       }
484     } catch (LiveScheduleException e) {
485       throw e;
486     } catch (Exception e) {
487       throw new LiveScheduleException(e);
488     } finally {
489       securityService.setUser(prevUser);
490     }
491   }
492 
493   /**
494    * Retrieves the media package from the search index.
495    *
496    * @param mediaPackageId
497    *          the media package id
498    * @return the media package in the search index or null if not there
499    * @throws LiveScheduleException
500    *           if found many media packages with the same id
501    */
502   MediaPackage getMediaPackageFromSearch(String mediaPackageId) throws LiveScheduleException {
503     // Issue #2504: make sure the search index is read by admin so that the media package is always found.
504     Organization org = securityService.getOrganization();
505     User prevUser = org != null ? securityService.getUser() : null;
506     securityService.setUser(SecurityUtil.createSystemUser(systemUserName, org));
507     try {
508       // Look for the media package in the search index
509       return searchService.get(mediaPackageId);
510     } catch (UnauthorizedException e) {
511       logger.warn("Unexpected unauthorized exception when querying the search index for mp {}", mediaPackageId, e);
512       return null;
513     } catch (NotFoundException e) {
514       return null;
515     } finally {
516       securityService.setUser(prevUser);
517     }
518   }
519 
520   void setDurationForMediaPackage(MediaPackage mp, DublinCoreCatalog dc) {
521     DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(dc.getFirst(DublinCore.PROPERTY_TEMPORAL));
522     long duration = period.getEnd().getTime() - period.getStart().getTime();
523     mp.setDuration(duration);
524     logger.debug("Live media package {} has start {} and duration {}", mp.getIdentifier(), mp.getDate(),
525             mp.getDuration());
526   }
527 
528   Map<String, Track> addLiveTracksToMediaPackage(MediaPackage mp, DublinCoreCatalog episodeDC)
529           throws LiveScheduleException {
530     String caName = episodeDC.getFirst(DublinCore.PROPERTY_SPATIAL);
531     HashMap<String, Track> generatedTracks = new HashMap<>();
532     String mpId = mp.getIdentifier().toString();
533     try {
534       // If capture agent registered the properties:
535       // capture.device.live.resolution.WIDTHxHEIGHT=COMPLETE_STREAMING_URL, use them!
536       try {
537         Properties caProps = captureAgentService.getAgentCapabilities(caName);
538         if (caProps != null) {
539           Enumeration<Object> en = caProps.keys();
540           while (en.hasMoreElements()) {
541             String key = (String) en.nextElement();
542             if (key.startsWith(CA_PROPERTY_RESOLUTION_URL_PREFIX)) {
543               String resolution = key.substring(CA_PROPERTY_RESOLUTION_URL_PREFIX.length());
544               String url = caProps.getProperty(key);
545               // Note: only one flavor is supported in this format (the default: presenter/delivery)
546               MediaPackageElementFlavor flavor = MediaPackageElementFlavor.parseFlavor(DEFAULT_LIVE_TARGET_FLAVORS);
547               String replacedUrl = replaceVariables(mpId, caName, url, flavor, resolution);
548               mp.add(buildStreamingTrack(replacedUrl, flavor, streamMimeType, resolution, mp.getDuration()));
549             }
550           }
551         }
552       } catch (NotFoundException e) {
553         // Capture agent not found so we can't get its properties. Assume the service configuration should
554         // be used instead. Note that we can't schedule anything on a CA that has not registered so this is
555         // unlikely to happen.
556       }
557 
558       // Capture agent did not pass any CA_PROPERTY_RESOLUTION_URL_PREFIX property when registering
559       // so use the service configuration
560       if (mp.getTracks().length == 0) {
561         if (liveStreamingUrl == null) {
562           throw new LiveScheduleException(
563                   "Cannot build live tracks because '" + LIVE_STREAMING_URL + "' configuration was not set.");
564         }
565 
566         for (MediaPackageElementFlavor flavor : liveFlavors) {
567           for (int i = 0; i < streamResolution.length; i++) {
568             String uri = replaceVariables(mpId, caName, UrlSupport.concat(liveStreamingUrl.toString(), streamName),
569                     flavor, streamResolution[i]);
570             Track track = buildStreamingTrack(uri, flavor, streamMimeType, streamResolution[i], mp.getDuration());
571             mp.add(track);
572             generatedTracks.put(flavor + ":" + streamResolution[i], track);
573           }
574         }
575       }
576     } catch (URISyntaxException e) {
577       throw new LiveScheduleException(e);
578     }
579     return generatedTracks;
580   }
581 
582   Track buildStreamingTrack(String uriString, MediaPackageElementFlavor flavor, String mimeType, String resolution,
583           long duration) throws URISyntaxException {
584 
585     URI uri = new URI(uriString);
586 
587     MediaPackageElementBuilder elementBuilder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
588     MediaPackageElement element = elementBuilder.elementFromURI(uri, MediaPackageElement.Type.Track, flavor);
589     TrackImpl track = (TrackImpl) element;
590 
591     // Set duration and mime type
592     track.setDuration(duration);
593     track.setLive(true);
594     track.setMimeType(MimeTypes.parseMimeType(mimeType));
595 
596     VideoStreamImpl video = new VideoStreamImpl("video-" + flavor.getType() + "-" + flavor.getSubtype());
597     // Set video resolution
598     String[] dimensions = resolution.split("x");
599     video.setFrameWidth(Integer.parseInt(dimensions[0]));
600     video.setFrameHeight(Integer.parseInt(dimensions[1]));
601 
602     track.addStream(video);
603 
604     logger.debug("Creating live track element of flavor {}, resolution {}, and url {}",
605             new Object[] { flavor, resolution, uriString });
606 
607     return track;
608   }
609 
610   /**
611    * Replaces variables in the live stream name. Currently, this is only prepared to handle the following: #{id} = media
612    * package id, #{flavor} = type-subtype of flavor, #{caName} = capture agent name, #{resolution} = stream resolution
613    */
614   String replaceVariables(String mpId, String caName, String toBeReplaced, MediaPackageElementFlavor flavor,
615           String resolution) {
616 
617     // Substitution pattern: any string in the form #{name}, where 'name' has only word characters: [a-zA-Z_0-9].
618     final Pattern pat = Pattern.compile("#\\{(\\w+)\\}");
619 
620     Matcher matcher = pat.matcher(toBeReplaced);
621     StringBuffer sb = new StringBuffer();
622     while (matcher.find()) {
623       if (matcher.group(1).equals(REPLACE_ID)) {
624         matcher.appendReplacement(sb, mpId);
625       } else if (matcher.group(1).equals(REPLACE_FLAVOR)) {
626         matcher.appendReplacement(sb, flavor.getType() + "-" + flavor.getSubtype());
627       } else if (matcher.group(1).equals(REPLACE_CA_NAME)) {
628         // Taking the easy route to find the capture agent name...
629         matcher.appendReplacement(sb, caName);
630       } else if (matcher.group(1).equals(REPLACE_RESOLUTION)) {
631         // Taking the easy route to find the capture agent name...
632         matcher.appendReplacement(sb, resolution);
633       } // else will not replace
634     }
635     matcher.appendTail(sb);
636     return sb.toString();
637   }
638 
639   private JobBarrier.Result waitForStatus(Job... jobs) throws IllegalStateException, IllegalArgumentException {
640     if (serviceRegistry == null) {
641       throw new IllegalStateException("Can't wait for job status without providing a service registry first");
642     }
643     JobBarrier barrier = new JobBarrier(null, serviceRegistry, jobPollingInterval, jobs);
644     return barrier.waitForJobs();
645   }
646 
647   Snapshot getSnapshotFromArchive(String mpId) throws LiveScheduleException {
648     AQueryBuilder query = assetManager.createQuery();
649     AResult result = query.select(query.snapshot()).where(query.mediaPackageId(mpId).and(query.version().isLatest()))
650             .run();
651     if (result.getSize() == 0) {
652       // Media package not archived?.
653       throw new LiveScheduleException(String.format("Unexpected error: media package %s has not been archived.", mpId));
654     }
655     Optional<ARecord> record = result.getRecords().stream().findFirst();
656     if (record.isEmpty()) {
657       // No snapshot?
658       throw new LiveScheduleException(String.format("Unexpected error: media package %s has not been archived.", mpId));
659     }
660     return record.get().getSnapshot().get();
661   }
662 
663   MediaPackage distributeAclsAndCatalogs(Snapshot snapshot) throws LiveScheduleException {
664     try {
665       MediaPackage mp = (MediaPackage) snapshot.getMediaPackage().clone();
666 
667       // Select elements
668       Collection<MediaPackageElement> elements = publishElementSelector.select(mp, false);
669       Set<String> elementIds = elements.stream().map(MediaPackageElement::getIdentifier).collect(Collectors.toSet());
670 
671       // Distribute elements
672       Job distributionJob = downloadDistributionService.distribute(CHANNEL_ID, mp, elementIds, false);
673       if (!waitForStatus(distributionJob).isSuccess()) {
674         throw new LiveScheduleException(
675                 "Element(s) for live media package " + mp.getIdentifier() + " could not be distributed");
676       }
677 
678       // Remove all elements from mp
679       for (MediaPackageElement e: mp.getElements()) {
680         mp.remove(e);
681       }
682 
683       // Re-add distributed elements
684       List<MediaPackageElement> distributedElements = (List<MediaPackageElement>) MediaPackageElementParser
685               .getArrayFromXml(distributionJob.getPayload());
686       for (MediaPackageElement distributedElement : distributedElements) {
687         mp.add(distributedElement);
688       }
689 
690       // Clean up
691       for (String id : elementIds) {
692         MediaPackageElement e = mp.getElementById(id);
693         workspace.delete(e.getURI());
694       }
695 
696       return mp;
697     } catch (LiveScheduleException e) {
698       throw e;
699     } catch (Exception e) {
700       throw new LiveScheduleException(e);
701     }
702   }
703 
704   MediaPackage replaceAndDistributeAcl(MediaPackage previousMp, AccessControlList acl) throws LiveScheduleException {
705     try {
706       // This is the mp from the search index
707       MediaPackage mp = (MediaPackage) previousMp.clone();
708 
709       // Remove previous Acl from the mp
710       Attachment[] atts = mp.getAttachments(MediaPackageElements.XACML_POLICY_EPISODE);
711       if (atts.length > 0) {
712         mp.remove(atts[0]);
713       }
714 
715       // Attach current ACL to mp, acl will be created in the ws/wfr
716       authService.setAcl(mp, AclScope.Episode, acl);
717       atts = mp.getAttachments(MediaPackageElements.XACML_POLICY_EPISODE);
718       if (atts.length > 0) {
719         String aclId = atts[0].getIdentifier();
720         // Distribute new acl
721         Job distributionJob = downloadDistributionService.distribute(CHANNEL_ID, mp, aclId, false);
722         if (!waitForStatus(distributionJob).isSuccess()) {
723           throw new LiveScheduleException(
724                   "Acl for live media package " + mp.getIdentifier() + " could not be distributed");
725         }
726 
727         MediaPackageElement e = mp.getElementById(aclId);
728         // Cleanup workspace/wfr
729         mp.remove(e);
730         workspace.delete(e.getURI());
731 
732         // Add distributed acl to mp
733         mp.add(MediaPackageElementParser.getFromXml(distributionJob.getPayload()));
734       }
735       return mp;
736     } catch (LiveScheduleException e) {
737       throw e;
738     } catch (Exception e) {
739       throw new LiveScheduleException(e);
740     }
741   }
742 
743   MediaPackage addLivePublicationToMediaPackage(Snapshot snapshot, Map<String, Track> generatedTracks)
744           throws LiveScheduleException {
745     MediaPackage mp = snapshot.getMediaPackage();
746 
747     Organization currentOrg = null;
748     try {
749       currentOrg = organizationService.getOrganization(snapshot.getOrganizationId());
750     } catch (NotFoundException e) {
751       logger.warn("Organization in snapshot not found: {}", snapshot.getOrganizationId());
752     }
753 
754     logger.debug("Adding live channel publication element to media package {}", mp);
755     String engageUrlString = null;
756     if (currentOrg != null) {
757       engageUrlString = StringUtils.trimToNull(currentOrg.getProperties().get(ENGAGE_URL_PROPERTY));
758     }
759     if (engageUrlString == null) {
760       engageUrlString = serverUrl;
761       logger.info(
762           "Using 'server.url' as a fallback for the non-existing organization level key '{}' for the publication url",
763           ENGAGE_URL_PROPERTY);
764     }
765 
766     try {
767       // Create new distribution element
768       URI engageUri = URIUtils.resolve(new URI(engageUrlString), PLAYER_PATH + mp.getIdentifier().toString());
769       Publication publicationElement = PublicationImpl.publication(UUID.randomUUID().toString(), CHANNEL_ID, engageUri,
770               MimeTypes.parseMimeType("text/html"));
771       mp.add(publicationElement);
772       createOrUpdatePublicationTracks(publicationElement, generatedTracks);
773       return mp;
774     } catch (URISyntaxException e) {
775       throw new LiveScheduleException(e);
776     }
777   }
778 
779   void removeLivePublicationChannel(MediaPackage mp) {
780     // Remove publication element
781     Publication[] publications = mp.getPublications();
782     if (publications != null) {
783       for (Publication publication : publications) {
784         if (CHANNEL_ID.equals(publication.getChannel())) {
785           mp.remove(publication);
786         }
787       }
788     }
789   }
790 
791   boolean isSameMediaPackage(MediaPackage previous, MediaPackage current) {
792 
793     Equator<Track> liveTrackEquator = new Equator<>() {
794       @Override
795       public boolean equate(Track track1, Track track2) {
796         // we can safely assume that each live track has exactly one video stream since we generated that ourselves
797         VideoStream videostream1 = (VideoStream) track1.getStreams()[0];
798         VideoStream videostream2 = (VideoStream) track2.getStreams()[0];
799 
800         return Objects.equals(track1.getURI(), track2.getURI())
801                 && Objects.equals(track1.getFlavor(), track2.getFlavor())
802                 && Objects.equals(track1.getMimeType(), track2.getMimeType())
803                 && Objects.equals(track1.getDuration(), track2.getDuration())
804                 && Objects.equals(videostream1.getFrameWidth(), videostream2.getFrameWidth())
805                 && Objects.equals(videostream1.getFrameHeight(), videostream2.getFrameHeight());
806       }
807 
808       @Override
809       public int hash(Track track) {
810         VideoStream videostream = (VideoStream) track.getStreams()[0];
811         return Objects.hash(track.getURI(), track.getFlavor(), track.getMimeType(), track.getDuration(),
812                 videostream.getFrameWidth(), videostream.getFrameHeight());
813       }
814     };
815 
816     Equator<MediaPackageElement> catalogAndAttachmentEquator = new Equator<>() {
817       @Override
818       public boolean equate(MediaPackageElement mpe1, MediaPackageElement mpe2) {
819         return Objects.equals(mpe1.getIdentifier(), mpe2.getIdentifier())
820                 && Objects.equals(mpe1.getElementType(), mpe2.getElementType())
821                 && Objects.equals(mpe1.getChecksum(), mpe2.getChecksum())
822                 && Objects.equals(mpe1.getFlavor(), mpe2.getFlavor());
823       }
824 
825       @Override
826       public int hash(MediaPackageElement mpe) {
827         return Objects.hash(mpe.getIdentifier(), mpe.getElementType(), mpe.getChecksum(), mpe.getFlavor());
828       }
829     };
830 
831     if (!CollectionUtils.isEqualCollection(Arrays.asList(previous.getTracks()), Arrays.asList(current.getTracks()),
832             liveTrackEquator)) {
833       return false;
834     } else if (!CollectionUtils.isEqualCollection(Arrays.asList(previous.getCatalogs()),
835             Arrays.asList(current.getCatalogs()), catalogAndAttachmentEquator)) {
836       return false;
837     } else {
838       return CollectionUtils.isEqualCollection(Arrays.asList(previous.getAttachments()),
839               Arrays.asList(current.getAttachments()), catalogAndAttachmentEquator);
840     }
841   }
842 
843   void retractPreviousElements(MediaPackage previousMp, MediaPackage newMp) throws LiveScheduleException {
844     try {
845       // Now can retract elements from previous publish. Before creating a retraction
846       // job, check if the element url is still used by the new media package.
847       Set<String> elementIds = new HashSet<String>();
848       for (MediaPackageElement element : previousMp.getElements()) {
849         // We don't retract tracks because they are just live links
850         if (!Track.TYPE.equals(element.getElementType())) {
851           boolean canBeDeleted = true;
852           for (MediaPackageElement newElement : newMp.getElements()) {
853             if (element.getURI().equals(newElement.getURI())) {
854               logger.debug(
855                   "Not retracting element {} with URI {} from download distribution because it is "
856                       + "still used by updated live media package",
857                   element.getIdentifier(), element.getURI());
858               canBeDeleted = false;
859               break;
860             }
861           }
862           if (canBeDeleted) {
863             elementIds.add(element.getIdentifier());
864           }
865         }
866       }
867       if (elementIds.size() > 0) {
868         Job job = downloadDistributionService.retract(CHANNEL_ID, previousMp, elementIds);
869         // Wait for retraction to finish
870         if (!waitForStatus(job).isSuccess()) {
871           logger.warn("One of the download retract jobs did not complete successfully");
872         } else {
873           logger.debug("Retraction of previously published elements complete");
874         }
875       }
876     } catch (DistributionException e) {
877       throw new LiveScheduleException(e);
878     }
879   }
880 
881   @Reference
882   public void setDublinCoreService(DublinCoreCatalogService service) {
883     this.dublinCoreService = service;
884   }
885 
886   @Reference
887   public void setSearchService(SearchService service) {
888     this.searchService = service;
889   }
890 
891   @Reference
892   public void setSeriesService(SeriesService service) {
893     this.seriesService = service;
894   }
895 
896   @Reference
897   public void setServiceRegistry(ServiceRegistry service) {
898     this.serviceRegistry = service;
899   }
900 
901   @Reference
902   public void setCaptureAgentService(CaptureAgentStateService service) {
903     this.captureAgentService = service;
904   }
905 
906   @Reference(
907       name = "DownloadDistributionService",
908       target = "(distribution.channel=download)"
909   )
910   public void setDownloadDistributionService(DownloadDistributionService service) {
911     this.downloadDistributionService = service;
912     logger.info("Distribution service with type '{}' set.", downloadDistributionService.getDistributionType());
913   }
914 
915   @Reference
916   public void setWorkspace(Workspace ws) {
917     this.workspace = ws;
918   }
919 
920   @Reference
921   public void setAssetManager(AssetManager assetManager) {
922     this.assetManager = assetManager;
923   }
924 
925   @Reference
926   public void setAuthorizationService(AuthorizationService service) {
927     this.authService = service;
928   }
929 
930   @Reference
931   public void setOrganizationService(OrganizationDirectoryService service) {
932     this.organizationService = service;
933   }
934 
935   @Reference
936   public void setSecurityService(SecurityService service) {
937     this.securityService = service;
938   }
939   // === Set by OSGI - end
940 
941   // === Used by unit tests - begin
942   void setJobPollingInterval(long jobPollingInterval) {
943     this.jobPollingInterval = jobPollingInterval;
944   }
945 
946   Cache<String, Version> getSnapshotVersionCache() {
947     return this.snapshotVersionCache;
948   }
949   // === Used by unit tests - end
950 }