1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.liveschedule.impl;
22
23 import org.opencastproject.assetmanager.api.AssetManager;
24 import org.opencastproject.assetmanager.api.Snapshot;
25 import org.opencastproject.assetmanager.api.Version;
26 import org.opencastproject.capture.admin.api.CaptureAgentStateService;
27 import org.opencastproject.distribution.api.DistributionException;
28 import org.opencastproject.distribution.api.DownloadDistributionService;
29 import org.opencastproject.job.api.Job;
30 import org.opencastproject.job.api.JobBarrier;
31 import org.opencastproject.liveschedule.api.LiveScheduleException;
32 import org.opencastproject.liveschedule.api.LiveScheduleService;
33 import org.opencastproject.mediapackage.Attachment;
34 import org.opencastproject.mediapackage.MediaPackage;
35 import org.opencastproject.mediapackage.MediaPackageElement;
36 import org.opencastproject.mediapackage.MediaPackageElementBuilder;
37 import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
38 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
39 import org.opencastproject.mediapackage.MediaPackageElementParser;
40 import org.opencastproject.mediapackage.MediaPackageElements;
41 import org.opencastproject.mediapackage.Publication;
42 import org.opencastproject.mediapackage.PublicationImpl;
43 import org.opencastproject.mediapackage.Track;
44 import org.opencastproject.mediapackage.VideoStream;
45 import org.opencastproject.mediapackage.selector.SimpleElementSelector;
46 import org.opencastproject.mediapackage.track.TrackImpl;
47 import org.opencastproject.mediapackage.track.VideoStreamImpl;
48 import org.opencastproject.metadata.dublincore.DCMIPeriod;
49 import org.opencastproject.metadata.dublincore.DublinCore;
50 import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
51 import org.opencastproject.metadata.dublincore.DublinCoreCatalogService;
52 import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
53 import org.opencastproject.search.api.SearchService;
54 import org.opencastproject.security.api.AccessControlList;
55 import org.opencastproject.security.api.AclScope;
56 import org.opencastproject.security.api.AuthorizationService;
57 import org.opencastproject.security.api.Organization;
58 import org.opencastproject.security.api.OrganizationDirectoryService;
59 import org.opencastproject.security.api.SecurityService;
60 import org.opencastproject.security.api.UnauthorizedException;
61 import org.opencastproject.security.api.User;
62 import org.opencastproject.security.util.SecurityUtil;
63 import org.opencastproject.series.api.SeriesService;
64 import org.opencastproject.serviceregistry.api.ServiceRegistry;
65 import org.opencastproject.util.MimeTypes;
66 import org.opencastproject.util.NotFoundException;
67 import org.opencastproject.util.UrlSupport;
68 import org.opencastproject.workspace.api.Workspace;
69
70 import com.google.common.cache.Cache;
71 import com.google.common.cache.CacheBuilder;
72
73 import org.apache.commons.collections4.CollectionUtils;
74 import org.apache.commons.collections4.Equator;
75 import org.apache.commons.lang3.StringUtils;
76 import org.apache.http.client.utils.URIUtils;
77 import org.osgi.framework.BundleContext;
78 import org.osgi.service.component.ComponentContext;
79 import org.osgi.service.component.annotations.Activate;
80 import org.osgi.service.component.annotations.Component;
81 import org.osgi.service.component.annotations.Reference;
82 import org.slf4j.Logger;
83 import org.slf4j.LoggerFactory;
84
85 import java.net.URI;
86 import java.net.URISyntaxException;
87 import java.util.ArrayList;
88 import java.util.Arrays;
89 import java.util.Collection;
90 import java.util.Dictionary;
91 import java.util.Enumeration;
92 import java.util.HashMap;
93 import java.util.HashSet;
94 import java.util.List;
95 import java.util.Map;
96 import java.util.Objects;
97 import java.util.Optional;
98 import java.util.Properties;
99 import java.util.Set;
100 import java.util.UUID;
101 import java.util.concurrent.TimeUnit;
102 import java.util.function.Predicate;
103 import java.util.regex.Matcher;
104 import java.util.regex.Pattern;
105 import java.util.stream.Collectors;
106
107 @Component(
108 immediate = true,
109 service = LiveScheduleService.class,
110 property = {
111 "service.description=Live Schedule Service"
112 }
113 )
114 public class LiveScheduleServiceImpl implements LiveScheduleService {
115
116 static final String SERVER_URL_PROPERTY = "org.opencastproject.server.url";
117
118 static final String ENGAGE_URL_PROPERTY = "org.opencastproject.engage.ui.url";
119
120 static final String PLAYER_PATH = "/play/";
121
122
123 private static final String DEFAULT_STREAM_MIME_TYPE = "video/mp4";
124 private static final String DEFAULT_STREAM_RESOLUTION = "1920x1080";
125 private static final String DEFAULT_STREAM_NAME = "live-stream";
126 private static final String DEFAULT_LIVE_TARGET_FLAVORS = "presenter/delivery";
127 static final String DEFAULT_LIVE_DISTRIBUTION_SERVICE = "download";
128
129
130
131
132
133
134
135 public static final String CA_PROPERTY_RESOLUTION_URL_PREFIX = "capture.device.live.resolution.";
136
137
138
139 public static final String REPLACE_ID = "id";
140 public static final String REPLACE_FLAVOR = "flavor";
141 public static final String REPLACE_CA_NAME = "caName";
142 public static final String REPLACE_RESOLUTION = "resolution";
143
144 public static final String LIVE_STREAMING_URL = "live.streamingUrl";
145 public static final String LIVE_STREAM_NAME = "live.streamName";
146 public static final String LIVE_STREAM_MIME_TYPE = "live.mimeType";
147 public static final String LIVE_STREAM_RESOLUTION = "live.resolution";
148 public static final String LIVE_TARGET_FLAVORS = "live.targetFlavors";
149 public static final String LIVE_DISTRIBUTION_SERVICE = "live.distributionService";
150 public static final String LIVE_PUBLISH_STREAMING = "live.publishStreaming";
151
152 private static final MediaPackageElementFlavor[] publishFlavors = { MediaPackageElements.EPISODE,
153 MediaPackageElements.SERIES, MediaPackageElements.XACML_POLICY_EPISODE,
154 MediaPackageElements.XACML_POLICY_SERIES };
155
156
157 private static final Logger logger = LoggerFactory.getLogger(LiveScheduleServiceImpl.class);
158
159 private String liveStreamingUrl;
160 private String streamName;
161 private String streamMimeType;
162 private String[] streamResolution;
163 private MediaPackageElementFlavor[] liveFlavors;
164 private String serverUrl;
165 private Cache<String, Version> snapshotVersionCache
166 = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
167
168 private List<String> publishedStreamingFormats = null;
169 private String systemUserName;
170
171
172 private DownloadDistributionService downloadDistributionService;
173 private SearchService searchService;
174 private SeriesService seriesService;
175 private DublinCoreCatalogService dublinCoreService;
176 private CaptureAgentStateService captureAgentService;
177 private ServiceRegistry serviceRegistry;
178 private Workspace workspace;
179 private AssetManager assetManager;
180 private AuthorizationService authService;
181 private OrganizationDirectoryService organizationService;
182 private SecurityService securityService;
183
184 private long jobPollingInterval = JobBarrier.DEFAULT_POLLING_INTERVAL;
185
186 private SimpleElementSelector publishElementSelector;
187
188
189
190
191
192
193
194 @Activate
195 protected void activate(ComponentContext context) {
196 BundleContext bundleContext = context.getBundleContext();
197
198 serverUrl = StringUtils.trimToNull(bundleContext.getProperty(SERVER_URL_PROPERTY));
199 if (serverUrl == null) {
200 logger.warn("Server url was not set in '{}'", SERVER_URL_PROPERTY);
201 } else {
202 logger.info("Server url is {}", serverUrl);
203 }
204 systemUserName = bundleContext.getProperty(SecurityUtil.PROPERTY_KEY_SYS_USER);
205
206 @SuppressWarnings("rawtypes")
207 Dictionary properties = context.getProperties();
208 if (!StringUtils.isBlank((String) properties.get(LIVE_STREAMING_URL))) {
209 liveStreamingUrl = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAMING_URL));
210 logger.info("Live streaming server url is {}", liveStreamingUrl);
211 } else {
212 logger.info("Live streaming url not set in '{}'. Streaming urls must be provided by capture agent properties.",
213 LIVE_STREAMING_URL);
214 }
215
216 if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_NAME))) {
217 streamName = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_NAME));
218 } else {
219 streamName = DEFAULT_STREAM_NAME;
220 }
221
222 if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_MIME_TYPE))) {
223 streamMimeType = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_MIME_TYPE));
224 } else {
225 streamMimeType = DEFAULT_STREAM_MIME_TYPE;
226 }
227
228 String resolution = null;
229 if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_RESOLUTION))) {
230 resolution = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_RESOLUTION));
231 } else {
232 resolution = DEFAULT_STREAM_RESOLUTION;
233 }
234 streamResolution = resolution.split(",");
235
236 String flavors = null;
237 if (!StringUtils.isBlank((String) properties.get(LIVE_TARGET_FLAVORS))) {
238 flavors = StringUtils.trimToEmpty((String) properties.get(LIVE_TARGET_FLAVORS));
239 } else {
240 flavors = DEFAULT_LIVE_TARGET_FLAVORS;
241 }
242 String[] flavorArray = StringUtils.split(flavors, ",");
243 liveFlavors = new MediaPackageElementFlavor[flavorArray.length];
244 int i = 0;
245 for (String f : flavorArray) {
246 liveFlavors[i++] = MediaPackageElementFlavor.parseFlavor(f);
247 }
248
249 publishedStreamingFormats = Arrays.asList(Optional.ofNullable(StringUtils.split(
250 (String)properties.get(LIVE_PUBLISH_STREAMING), ",")).orElse(new String[0]));
251
252 publishElementSelector = new SimpleElementSelector();
253 for (MediaPackageElementFlavor flavor : publishFlavors) {
254 publishElementSelector.addFlavor(flavor);
255 }
256
257 logger.info(
258 "Configured live stream name: {}, mime type: {}, resolution: {}, target flavors: {}",
259 streamName, streamMimeType, resolution, flavors);
260 }
261
262 @Override
263 public boolean createOrUpdateLiveEvent(String mpId, DublinCoreCatalog episodeDC) throws LiveScheduleException {
264 MediaPackage mp = getMediaPackageFromSearch(mpId);
265 if (mp == null) {
266
267
268 DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(episodeDC.getFirst(DublinCore.PROPERTY_TEMPORAL));
269 if (period.getEnd().getTime() <= System.currentTimeMillis()) {
270 logger.info("Live media package {} not created in search index because event is already past (end date: {})",
271 mpId, period.getEnd());
272 return false;
273 }
274 return createLiveEvent(mpId, episodeDC);
275 } else {
276
277
278 if (!mp.isLive()) {
279 logger.info("Media package {} is in search index but not live so not updating it.", mpId);
280 return false;
281 }
282 return updateLiveEvent(mp, episodeDC);
283 }
284 }
285
286 @Override
287 public boolean deleteLiveEvent(String mpId) throws LiveScheduleException {
288 MediaPackage mp = getMediaPackageFromSearch(mpId);
289 if (mp == null) {
290 logger.debug("Live media package {} not found in search index", mpId);
291 return false;
292 } else {
293 if (!mp.isLive()) {
294 logger.info("Media package {} is not live. Not retracting.", mpId);
295 return false;
296 }
297 return retractLiveEvent(mp);
298 }
299 }
300
301 @Override
302 public boolean updateLiveEventAcl(String mpId, AccessControlList acl) throws LiveScheduleException {
303 MediaPackage previousMp = getMediaPackageFromSearch(mpId);
304 if (previousMp != null) {
305 if (!previousMp.isLive()) {
306 logger.info("Media package {} is not live. Not updating acl.", mpId);
307 return false;
308 }
309
310 MediaPackage newMp = replaceAndDistributeAcl(previousMp, acl);
311
312 publishToSearch(newMp);
313
314 retractPreviousElements(previousMp, newMp);
315 logger.info("Updated live acl for media package {}", newMp);
316 return true;
317 }
318 return false;
319 }
320
321 boolean createLiveEvent(String mpId, DublinCoreCatalog episodeDC) throws LiveScheduleException {
322 try {
323 logger.info("Creating live media package {}", mpId);
324 Snapshot snapshot = getSnapshotFromArchive(mpId);
325
326
327 MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
328 setDurationForMediaPackage(tmpMp, episodeDC);
329 Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);
330
331
332 MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
333 for (Track t : tmpMp.getTracks()) {
334 mpForSearch.add(t);
335 }
336 publishToSearch(mpForSearch);
337
338
339 MediaPackage updatedArchivedMp = addLivePublicationToMediaPackage(snapshot, liveTracks);
340 snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
341 return true;
342 } catch (Exception e) {
343 throw new LiveScheduleException(e);
344 }
345 }
346
347 boolean updateLiveEvent(MediaPackage mpFromSearch, DublinCoreCatalog episodeDC) throws LiveScheduleException {
348 String mpId = mpFromSearch.getIdentifier().toString();
349 Snapshot snapshot = getSnapshotFromArchive(mpId);
350
351
352
353
354 if (snapshot.getVersion().equals(snapshotVersionCache.getIfPresent(mpId))) {
355 logger.debug("Snapshot version {} was created by us so this change is ignored.", snapshot.getVersion());
356 return false;
357 }
358
359
360 MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
361
362 Collection<MediaPackageElement> elements = publishElementSelector.select(tmpMp, false);
363 Arrays.stream(tmpMp.getElements()).filter(Predicate.not(elements::contains)).collect(Collectors.toList())
364 .forEach(tmpMp::remove);
365
366 setDurationForMediaPackage(tmpMp, episodeDC);
367 Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);
368
369
370 if (isSameMediaPackage(mpFromSearch, tmpMp)) {
371 logger.debug("Live media package {} seems to be the same. Not updating.", mpFromSearch);
372 return false;
373 }
374
375 logger.info("Updating live media package {}", mpFromSearch);
376
377
378 MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
379 for (Track t : tmpMp.getTracks()) {
380 mpForSearch.add(t);
381 }
382 removeLivePublicationChannel(mpForSearch);
383 publishToSearch(mpForSearch);
384 retractPreviousElements(mpFromSearch, mpForSearch);
385
386
387 MediaPackage updatedArchivedMp = updateLivePublication(snapshot.getMediaPackage(), liveTracks);
388 snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
389 return true;
390 }
391
392 private void createOrUpdatePublicationTracks(Publication publication, Map<String, Track> generatedTracks) {
393 if (publication.getTracks().length > 0) {
394 publication.clearTracks();
395 }
396
397 for (String publishedStreamingFormat : publishedStreamingFormats) {
398 Track track = generatedTracks.get(publishedStreamingFormat);
399 if (track != null) {
400 publication.addTrack(track);
401 }
402 }
403 }
404
405 private MediaPackage updateLivePublication(MediaPackage mediaPackage, Map<String, Track> generatedTracks) {
406 Publication[] publications = mediaPackage.getPublications();
407 for (Publication publication : publications) {
408 if (publication.getChannel().equals(CHANNEL_ID)) {
409 createOrUpdatePublicationTracks(publication, generatedTracks);
410 }
411 }
412 return mediaPackage;
413 }
414
415 boolean retractLiveEvent(MediaPackage mp) throws LiveScheduleException {
416 retract(mp);
417
418
419 try {
420 String mpId = mp.getIdentifier().toString();
421 Snapshot snapshot = getSnapshotFromArchive(mpId);
422 MediaPackage archivedMp = snapshot.getMediaPackage();
423 removeLivePublicationChannel(archivedMp);
424 logger.debug("Removed live pub channel from archived media package {}", mp);
425
426
427 snapshotVersionCache.put(mpId, assetManager.takeSnapshot(archivedMp).getVersion());
428 } catch (LiveScheduleException e) {
429
430 }
431 return true;
432 }
433
434 void publishToSearch(MediaPackage mp) throws LiveScheduleException {
435 try {
436
437 logger.info("Publishing LIVE media package {} to search index", mp);
438 Job publishJob = searchService.add(mp);
439 if (!waitForStatus(publishJob).isSuccess()) {
440 throw new LiveScheduleException("Live media package " + mp.getIdentifier() + " could not be published");
441 }
442 } catch (LiveScheduleException e) {
443 throw e;
444 } catch (Exception e) {
445 throw new LiveScheduleException(e);
446 }
447 }
448
449 void retract(MediaPackage mp) throws LiveScheduleException {
450 Organization org = securityService.getOrganization();
451 User prevUser = org != null ? securityService.getUser() : null;
452 try {
453 securityService.setUser(SecurityUtil.createSystemUser(systemUserName, org));
454 Set<String> elementIds = new HashSet<String>();
455 String mpId = mp.getIdentifier().toString();
456 logger.info("Removing LIVE media package {} from the search index", mpId);
457
458 for (MediaPackageElement mpe : mp.getElements()) {
459 if (!MediaPackageElement.Type.Publication.equals(mpe.getElementType())) {
460 elementIds.add(mpe.getIdentifier());
461 }
462 }
463
464 List<String> failedJobs = new ArrayList<>();
465
466 Job searchDeleteJob = searchService.delete(mpId);
467 if (!waitForStatus(searchDeleteJob).isSuccess()) {
468 failedJobs.add("Search Index");
469 }
470
471
472 Job distributionRetractJob = downloadDistributionService.retract(CHANNEL_ID, mp, elementIds);
473 if (!waitForStatus(distributionRetractJob).isSuccess()) {
474 failedJobs.add("Distribution");
475 }
476
477 if (!failedJobs.isEmpty()) {
478 throw new LiveScheduleException(
479 String.format("Removing live media package %s from %s failed", mpId, String.join(" and ", failedJobs)));
480 }
481 } catch (LiveScheduleException e) {
482 throw e;
483 } catch (Exception e) {
484 throw new LiveScheduleException(e);
485 } finally {
486 securityService.setUser(prevUser);
487 }
488 }
489
490
491
492
493
494
495
496
497
498
499 MediaPackage getMediaPackageFromSearch(String mediaPackageId) throws LiveScheduleException {
500
501 Organization org = securityService.getOrganization();
502 User prevUser = org != null ? securityService.getUser() : null;
503 securityService.setUser(SecurityUtil.createSystemUser(systemUserName, org));
504 try {
505
506 return searchService.get(mediaPackageId);
507 } catch (UnauthorizedException e) {
508 logger.warn("Unexpected unauthorized exception when querying the search index for mp {}", mediaPackageId, e);
509 return null;
510 } catch (NotFoundException e) {
511 return null;
512 } finally {
513 securityService.setUser(prevUser);
514 }
515 }
516
517 void setDurationForMediaPackage(MediaPackage mp, DublinCoreCatalog dc) {
518 DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(dc.getFirst(DublinCore.PROPERTY_TEMPORAL));
519 long duration = period.getEnd().getTime() - period.getStart().getTime();
520 mp.setDuration(duration);
521 logger.debug("Live media package {} has start {} and duration {}", mp.getIdentifier(), mp.getDate(),
522 mp.getDuration());
523 }
524
525 Map<String, Track> addLiveTracksToMediaPackage(MediaPackage mp, DublinCoreCatalog episodeDC)
526 throws LiveScheduleException {
527 String caName = episodeDC.getFirst(DublinCore.PROPERTY_SPATIAL);
528 HashMap<String, Track> generatedTracks = new HashMap<>();
529 String mpId = mp.getIdentifier().toString();
530 try {
531
532
533 try {
534 Properties caProps = captureAgentService.getAgentCapabilities(caName);
535 if (caProps != null) {
536 Enumeration<Object> en = caProps.keys();
537 while (en.hasMoreElements()) {
538 String key = (String) en.nextElement();
539 if (key.startsWith(CA_PROPERTY_RESOLUTION_URL_PREFIX)) {
540 String resolution = key.substring(CA_PROPERTY_RESOLUTION_URL_PREFIX.length());
541 String url = caProps.getProperty(key);
542
543 MediaPackageElementFlavor flavor = MediaPackageElementFlavor.parseFlavor(DEFAULT_LIVE_TARGET_FLAVORS);
544 String replacedUrl = replaceVariables(mpId, caName, url, flavor, resolution);
545 mp.add(buildStreamingTrack(replacedUrl, flavor, streamMimeType, resolution, mp.getDuration()));
546 }
547 }
548 }
549 } catch (NotFoundException e) {
550
551
552
553 }
554
555
556
557 if (mp.getTracks().length == 0) {
558 if (liveStreamingUrl == null) {
559 throw new LiveScheduleException(
560 "Cannot build live tracks because '" + LIVE_STREAMING_URL + "' configuration was not set.");
561 }
562
563 for (MediaPackageElementFlavor flavor : liveFlavors) {
564 for (int i = 0; i < streamResolution.length; i++) {
565 String uri = replaceVariables(mpId, caName, UrlSupport.concat(liveStreamingUrl.toString(), streamName),
566 flavor, streamResolution[i]);
567 Track track = buildStreamingTrack(uri, flavor, streamMimeType, streamResolution[i], mp.getDuration());
568 mp.add(track);
569 generatedTracks.put(flavor + ":" + streamResolution[i], track);
570 }
571 }
572 }
573 } catch (URISyntaxException e) {
574 throw new LiveScheduleException(e);
575 }
576 return generatedTracks;
577 }
578
579 Track buildStreamingTrack(String uriString, MediaPackageElementFlavor flavor, String mimeType, String resolution,
580 long duration) throws URISyntaxException {
581
582 URI uri = new URI(uriString);
583
584 MediaPackageElementBuilder elementBuilder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
585 MediaPackageElement element = elementBuilder.elementFromURI(uri, MediaPackageElement.Type.Track, flavor);
586 TrackImpl track = (TrackImpl) element;
587
588
589 track.setDuration(duration);
590 track.setLive(true);
591 track.setMimeType(MimeTypes.parseMimeType(mimeType));
592
593 VideoStreamImpl video = new VideoStreamImpl("video-" + flavor.getType() + "-" + flavor.getSubtype());
594
595 String[] dimensions = resolution.split("x");
596 video.setFrameWidth(Integer.parseInt(dimensions[0]));
597 video.setFrameHeight(Integer.parseInt(dimensions[1]));
598
599 track.addStream(video);
600
601 logger.debug("Creating live track element of flavor {}, resolution {}, and url {}",
602 new Object[] { flavor, resolution, uriString });
603
604 return track;
605 }
606
607
608
609
610
611 String replaceVariables(String mpId, String caName, String toBeReplaced, MediaPackageElementFlavor flavor,
612 String resolution) {
613
614
615 final Pattern pat = Pattern.compile("#\\{(\\w+)\\}");
616
617 Matcher matcher = pat.matcher(toBeReplaced);
618 StringBuffer sb = new StringBuffer();
619 while (matcher.find()) {
620 if (matcher.group(1).equals(REPLACE_ID)) {
621 matcher.appendReplacement(sb, mpId);
622 } else if (matcher.group(1).equals(REPLACE_FLAVOR)) {
623 matcher.appendReplacement(sb, flavor.getType() + "-" + flavor.getSubtype());
624 } else if (matcher.group(1).equals(REPLACE_CA_NAME)) {
625
626 matcher.appendReplacement(sb, caName);
627 } else if (matcher.group(1).equals(REPLACE_RESOLUTION)) {
628
629 matcher.appendReplacement(sb, resolution);
630 }
631 }
632 matcher.appendTail(sb);
633 return sb.toString();
634 }
635
636 private JobBarrier.Result waitForStatus(Job... jobs) throws IllegalStateException, IllegalArgumentException {
637 if (serviceRegistry == null) {
638 throw new IllegalStateException("Can't wait for job status without providing a service registry first");
639 }
640 JobBarrier barrier = new JobBarrier(null, serviceRegistry, jobPollingInterval, jobs);
641 return barrier.waitForJobs();
642 }
643
644 Snapshot getSnapshotFromArchive(String mpId) throws LiveScheduleException {
645 Optional<Snapshot> snapshot = assetManager.getLatestSnapshot(mpId);
646 if (snapshot.isEmpty()) {
647
648 throw new LiveScheduleException(String.format("Unexpected error: media package %s has not been archived.", mpId));
649 }
650 return snapshot.get();
651 }
652
653 MediaPackage distributeAclsAndCatalogs(Snapshot snapshot) throws LiveScheduleException {
654 try {
655 MediaPackage mp = (MediaPackage) snapshot.getMediaPackage().clone();
656
657
658 Collection<MediaPackageElement> elements = publishElementSelector.select(mp, false);
659 Set<String> elementIds = elements.stream().map(MediaPackageElement::getIdentifier).collect(Collectors.toSet());
660
661
662 Job distributionJob = downloadDistributionService.distribute(CHANNEL_ID, mp, elementIds, false);
663 if (!waitForStatus(distributionJob).isSuccess()) {
664 throw new LiveScheduleException(
665 "Element(s) for live media package " + mp.getIdentifier() + " could not be distributed");
666 }
667
668
669 for (MediaPackageElement e: mp.getElements()) {
670 mp.remove(e);
671 }
672
673
674 List<MediaPackageElement> distributedElements = (List<MediaPackageElement>) MediaPackageElementParser
675 .getArrayFromXml(distributionJob.getPayload());
676 for (MediaPackageElement distributedElement : distributedElements) {
677 mp.add(distributedElement);
678 }
679
680
681 for (String id : elementIds) {
682 MediaPackageElement e = mp.getElementById(id);
683 workspace.delete(e.getURI());
684 }
685
686 return mp;
687 } catch (LiveScheduleException e) {
688 throw e;
689 } catch (Exception e) {
690 throw new LiveScheduleException(e);
691 }
692 }
693
694 MediaPackage replaceAndDistributeAcl(MediaPackage previousMp, AccessControlList acl) throws LiveScheduleException {
695 try {
696
697 MediaPackage mp = (MediaPackage) previousMp.clone();
698
699
700 Attachment[] atts = mp.getAttachments(MediaPackageElements.XACML_POLICY_EPISODE);
701 if (atts.length > 0) {
702 mp.remove(atts[0]);
703 }
704
705
706 authService.setAcl(mp, AclScope.Episode, acl);
707 atts = mp.getAttachments(MediaPackageElements.XACML_POLICY_EPISODE);
708 if (atts.length > 0) {
709 String aclId = atts[0].getIdentifier();
710
711 Job distributionJob = downloadDistributionService.distribute(CHANNEL_ID, mp, aclId, false);
712 if (!waitForStatus(distributionJob).isSuccess()) {
713 throw new LiveScheduleException(
714 "Acl for live media package " + mp.getIdentifier() + " could not be distributed");
715 }
716
717 MediaPackageElement e = mp.getElementById(aclId);
718
719 mp.remove(e);
720 workspace.delete(e.getURI());
721
722
723 mp.add(MediaPackageElementParser.getFromXml(distributionJob.getPayload()));
724 }
725 return mp;
726 } catch (LiveScheduleException e) {
727 throw e;
728 } catch (Exception e) {
729 throw new LiveScheduleException(e);
730 }
731 }
732
733 MediaPackage addLivePublicationToMediaPackage(Snapshot snapshot, Map<String, Track> generatedTracks)
734 throws LiveScheduleException {
735 MediaPackage mp = snapshot.getMediaPackage();
736
737 Organization currentOrg = null;
738 try {
739 currentOrg = organizationService.getOrganization(snapshot.getOrganizationId());
740 } catch (NotFoundException e) {
741 logger.warn("Organization in snapshot not found: {}", snapshot.getOrganizationId());
742 }
743
744 logger.debug("Adding live channel publication element to media package {}", mp);
745 String engageUrlString = null;
746 if (currentOrg != null) {
747 engageUrlString = StringUtils.trimToNull(currentOrg.getProperties().get(ENGAGE_URL_PROPERTY));
748 }
749 if (engageUrlString == null) {
750 engageUrlString = serverUrl;
751 logger.info(
752 "Using 'server.url' as a fallback for the non-existing organization level key '{}' for the publication url",
753 ENGAGE_URL_PROPERTY);
754 }
755
756 try {
757
758 URI engageUri = URIUtils.resolve(new URI(engageUrlString), PLAYER_PATH + mp.getIdentifier().toString());
759 Publication publicationElement = PublicationImpl.publication(UUID.randomUUID().toString(), CHANNEL_ID, engageUri,
760 MimeTypes.parseMimeType("text/html"));
761 mp.add(publicationElement);
762 createOrUpdatePublicationTracks(publicationElement, generatedTracks);
763 return mp;
764 } catch (URISyntaxException e) {
765 throw new LiveScheduleException(e);
766 }
767 }
768
769 void removeLivePublicationChannel(MediaPackage mp) {
770
771 Publication[] publications = mp.getPublications();
772 if (publications != null) {
773 for (Publication publication : publications) {
774 if (CHANNEL_ID.equals(publication.getChannel())) {
775 mp.remove(publication);
776 }
777 }
778 }
779 }
780
781 boolean isSameMediaPackage(MediaPackage previous, MediaPackage current) {
782
783 Equator<Track> liveTrackEquator = new Equator<>() {
784 @Override
785 public boolean equate(Track track1, Track track2) {
786
787 VideoStream videostream1 = (VideoStream) track1.getStreams()[0];
788 VideoStream videostream2 = (VideoStream) track2.getStreams()[0];
789
790 return Objects.equals(track1.getURI(), track2.getURI())
791 && Objects.equals(track1.getFlavor(), track2.getFlavor())
792 && Objects.equals(track1.getMimeType(), track2.getMimeType())
793 && Objects.equals(track1.getDuration(), track2.getDuration())
794 && Objects.equals(videostream1.getFrameWidth(), videostream2.getFrameWidth())
795 && Objects.equals(videostream1.getFrameHeight(), videostream2.getFrameHeight());
796 }
797
798 @Override
799 public int hash(Track track) {
800 VideoStream videostream = (VideoStream) track.getStreams()[0];
801 return Objects.hash(track.getURI(), track.getFlavor(), track.getMimeType(), track.getDuration(),
802 videostream.getFrameWidth(), videostream.getFrameHeight());
803 }
804 };
805
806 Equator<MediaPackageElement> catalogAndAttachmentEquator = new Equator<>() {
807 @Override
808 public boolean equate(MediaPackageElement mpe1, MediaPackageElement mpe2) {
809 return Objects.equals(mpe1.getIdentifier(), mpe2.getIdentifier())
810 && Objects.equals(mpe1.getElementType(), mpe2.getElementType())
811 && Objects.equals(mpe1.getChecksum(), mpe2.getChecksum())
812 && Objects.equals(mpe1.getFlavor(), mpe2.getFlavor());
813 }
814
815 @Override
816 public int hash(MediaPackageElement mpe) {
817 return Objects.hash(mpe.getIdentifier(), mpe.getElementType(), mpe.getChecksum(), mpe.getFlavor());
818 }
819 };
820
821 if (!CollectionUtils.isEqualCollection(Arrays.asList(previous.getTracks()), Arrays.asList(current.getTracks()),
822 liveTrackEquator)) {
823 return false;
824 } else if (!CollectionUtils.isEqualCollection(Arrays.asList(previous.getCatalogs()),
825 Arrays.asList(current.getCatalogs()), catalogAndAttachmentEquator)) {
826 return false;
827 } else {
828 return CollectionUtils.isEqualCollection(Arrays.asList(previous.getAttachments()),
829 Arrays.asList(current.getAttachments()), catalogAndAttachmentEquator);
830 }
831 }
832
833 void retractPreviousElements(MediaPackage previousMp, MediaPackage newMp) throws LiveScheduleException {
834 try {
835
836
837 Set<String> elementIds = new HashSet<String>();
838 for (MediaPackageElement element : previousMp.getElements()) {
839
840 if (!Track.TYPE.equals(element.getElementType())) {
841 boolean canBeDeleted = true;
842 for (MediaPackageElement newElement : newMp.getElements()) {
843 if (element.getURI().equals(newElement.getURI())) {
844 logger.debug(
845 "Not retracting element {} with URI {} from download distribution because it is "
846 + "still used by updated live media package",
847 element.getIdentifier(), element.getURI());
848 canBeDeleted = false;
849 break;
850 }
851 }
852 if (canBeDeleted) {
853 elementIds.add(element.getIdentifier());
854 }
855 }
856 }
857 if (elementIds.size() > 0) {
858 Job job = downloadDistributionService.retract(CHANNEL_ID, previousMp, elementIds);
859
860 if (!waitForStatus(job).isSuccess()) {
861 logger.warn("One of the download retract jobs did not complete successfully");
862 } else {
863 logger.debug("Retraction of previously published elements complete");
864 }
865 }
866 } catch (DistributionException e) {
867 throw new LiveScheduleException(e);
868 }
869 }
870
871 @Reference
872 public void setDublinCoreService(DublinCoreCatalogService service) {
873 this.dublinCoreService = service;
874 }
875
876 @Reference
877 public void setSearchService(SearchService service) {
878 this.searchService = service;
879 }
880
881 @Reference
882 public void setSeriesService(SeriesService service) {
883 this.seriesService = service;
884 }
885
886 @Reference
887 public void setServiceRegistry(ServiceRegistry service) {
888 this.serviceRegistry = service;
889 }
890
891 @Reference
892 public void setCaptureAgentService(CaptureAgentStateService service) {
893 this.captureAgentService = service;
894 }
895
896 @Reference(
897 name = "DownloadDistributionService",
898 target = "(distribution.channel=download)"
899 )
900 public void setDownloadDistributionService(DownloadDistributionService service) {
901 this.downloadDistributionService = service;
902 logger.info("Distribution service with type '{}' set.", downloadDistributionService.getDistributionType());
903 }
904
905 @Reference
906 public void setWorkspace(Workspace ws) {
907 this.workspace = ws;
908 }
909
910 @Reference
911 public void setAssetManager(AssetManager assetManager) {
912 this.assetManager = assetManager;
913 }
914
915 @Reference
916 public void setAuthorizationService(AuthorizationService service) {
917 this.authService = service;
918 }
919
920 @Reference
921 public void setOrganizationService(OrganizationDirectoryService service) {
922 this.organizationService = service;
923 }
924
925 @Reference
926 public void setSecurityService(SecurityService service) {
927 this.securityService = service;
928 }
929
930
931
932 void setJobPollingInterval(long jobPollingInterval) {
933 this.jobPollingInterval = jobPollingInterval;
934 }
935
936 Cache<String, Version> getSnapshotVersionCache() {
937 return this.snapshotVersionCache;
938 }
939
940 }