1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.liveschedule.impl;
22
23 import org.opencastproject.assetmanager.api.AssetManager;
24 import org.opencastproject.assetmanager.api.Snapshot;
25 import org.opencastproject.assetmanager.api.Version;
26 import org.opencastproject.assetmanager.api.query.AQueryBuilder;
27 import org.opencastproject.assetmanager.api.query.ARecord;
28 import org.opencastproject.assetmanager.api.query.AResult;
29 import org.opencastproject.capture.admin.api.CaptureAgentStateService;
30 import org.opencastproject.distribution.api.DistributionException;
31 import org.opencastproject.distribution.api.DownloadDistributionService;
32 import org.opencastproject.job.api.Job;
33 import org.opencastproject.job.api.JobBarrier;
34 import org.opencastproject.liveschedule.api.LiveScheduleException;
35 import org.opencastproject.liveschedule.api.LiveScheduleService;
36 import org.opencastproject.mediapackage.Attachment;
37 import org.opencastproject.mediapackage.MediaPackage;
38 import org.opencastproject.mediapackage.MediaPackageElement;
39 import org.opencastproject.mediapackage.MediaPackageElementBuilder;
40 import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
41 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
42 import org.opencastproject.mediapackage.MediaPackageElementParser;
43 import org.opencastproject.mediapackage.MediaPackageElements;
44 import org.opencastproject.mediapackage.Publication;
45 import org.opencastproject.mediapackage.PublicationImpl;
46 import org.opencastproject.mediapackage.Track;
47 import org.opencastproject.mediapackage.VideoStream;
48 import org.opencastproject.mediapackage.selector.SimpleElementSelector;
49 import org.opencastproject.mediapackage.track.TrackImpl;
50 import org.opencastproject.mediapackage.track.VideoStreamImpl;
51 import org.opencastproject.metadata.dublincore.DCMIPeriod;
52 import org.opencastproject.metadata.dublincore.DublinCore;
53 import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
54 import org.opencastproject.metadata.dublincore.DublinCoreCatalogService;
55 import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
56 import org.opencastproject.search.api.SearchService;
57 import org.opencastproject.security.api.AccessControlList;
58 import org.opencastproject.security.api.AclScope;
59 import org.opencastproject.security.api.AuthorizationService;
60 import org.opencastproject.security.api.Organization;
61 import org.opencastproject.security.api.OrganizationDirectoryService;
62 import org.opencastproject.security.api.SecurityService;
63 import org.opencastproject.security.api.UnauthorizedException;
64 import org.opencastproject.security.api.User;
65 import org.opencastproject.security.util.SecurityUtil;
66 import org.opencastproject.series.api.SeriesService;
67 import org.opencastproject.serviceregistry.api.ServiceRegistry;
68 import org.opencastproject.util.MimeTypes;
69 import org.opencastproject.util.NotFoundException;
70 import org.opencastproject.util.UrlSupport;
71 import org.opencastproject.workspace.api.Workspace;
72
73 import com.google.common.cache.Cache;
74 import com.google.common.cache.CacheBuilder;
75
76 import org.apache.commons.collections4.CollectionUtils;
77 import org.apache.commons.collections4.Equator;
78 import org.apache.commons.lang3.StringUtils;
79 import org.apache.http.client.utils.URIUtils;
80 import org.osgi.framework.BundleContext;
81 import org.osgi.service.component.ComponentContext;
82 import org.osgi.service.component.annotations.Activate;
83 import org.osgi.service.component.annotations.Component;
84 import org.osgi.service.component.annotations.Reference;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
87
88 import java.net.URI;
89 import java.net.URISyntaxException;
90 import java.util.ArrayList;
91 import java.util.Arrays;
92 import java.util.Collection;
93 import java.util.Dictionary;
94 import java.util.Enumeration;
95 import java.util.HashMap;
96 import java.util.HashSet;
97 import java.util.List;
98 import java.util.Map;
99 import java.util.Objects;
100 import java.util.Optional;
101 import java.util.Properties;
102 import java.util.Set;
103 import java.util.UUID;
104 import java.util.concurrent.TimeUnit;
105 import java.util.function.Predicate;
106 import java.util.regex.Matcher;
107 import java.util.regex.Pattern;
108 import java.util.stream.Collectors;
109
110 @Component(
111 immediate = true,
112 service = LiveScheduleService.class,
113 property = {
114 "service.description=Live Schedule Service"
115 }
116 )
117 public class LiveScheduleServiceImpl implements LiveScheduleService {
118
119 static final String SERVER_URL_PROPERTY = "org.opencastproject.server.url";
120
121 static final String ENGAGE_URL_PROPERTY = "org.opencastproject.engage.ui.url";
122
123 static final String PLAYER_PATH = "/play/";
124
125
126 private static final String DEFAULT_STREAM_MIME_TYPE = "video/mp4";
127 private static final String DEFAULT_STREAM_RESOLUTION = "1920x1080";
128 private static final String DEFAULT_STREAM_NAME = "live-stream";
129 private static final String DEFAULT_LIVE_TARGET_FLAVORS = "presenter/delivery";
130 static final String DEFAULT_LIVE_DISTRIBUTION_SERVICE = "download";
131
132
133
134
135
136
137
138 public static final String CA_PROPERTY_RESOLUTION_URL_PREFIX = "capture.device.live.resolution.";
139
140
141
142 public static final String REPLACE_ID = "id";
143 public static final String REPLACE_FLAVOR = "flavor";
144 public static final String REPLACE_CA_NAME = "caName";
145 public static final String REPLACE_RESOLUTION = "resolution";
146
147 public static final String LIVE_STREAMING_URL = "live.streamingUrl";
148 public static final String LIVE_STREAM_NAME = "live.streamName";
149 public static final String LIVE_STREAM_MIME_TYPE = "live.mimeType";
150 public static final String LIVE_STREAM_RESOLUTION = "live.resolution";
151 public static final String LIVE_TARGET_FLAVORS = "live.targetFlavors";
152 public static final String LIVE_DISTRIBUTION_SERVICE = "live.distributionService";
153 public static final String LIVE_PUBLISH_STREAMING = "live.publishStreaming";
154
155 private static final MediaPackageElementFlavor[] publishFlavors = { MediaPackageElements.EPISODE,
156 MediaPackageElements.SERIES, MediaPackageElements.XACML_POLICY_EPISODE,
157 MediaPackageElements.XACML_POLICY_SERIES };
158
159
160 private static final Logger logger = LoggerFactory.getLogger(LiveScheduleServiceImpl.class);
161
162 private String liveStreamingUrl;
163 private String streamName;
164 private String streamMimeType;
165 private String[] streamResolution;
166 private MediaPackageElementFlavor[] liveFlavors;
167 private String serverUrl;
168 private Cache<String, Version> snapshotVersionCache
169 = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
170
171 private List<String> publishedStreamingFormats = null;
172 private String systemUserName;
173
174
175 private DownloadDistributionService downloadDistributionService;
176 private SearchService searchService;
177 private SeriesService seriesService;
178 private DublinCoreCatalogService dublinCoreService;
179 private CaptureAgentStateService captureAgentService;
180 private ServiceRegistry serviceRegistry;
181 private Workspace workspace;
182 private AssetManager assetManager;
183 private AuthorizationService authService;
184 private OrganizationDirectoryService organizationService;
185 private SecurityService securityService;
186
187 private long jobPollingInterval = JobBarrier.DEFAULT_POLLING_INTERVAL;
188
189 private SimpleElementSelector publishElementSelector;
190
191
192
193
194
195
196
197 @Activate
198 protected void activate(ComponentContext context) {
199 BundleContext bundleContext = context.getBundleContext();
200
201 serverUrl = StringUtils.trimToNull(bundleContext.getProperty(SERVER_URL_PROPERTY));
202 if (serverUrl == null) {
203 logger.warn("Server url was not set in '{}'", SERVER_URL_PROPERTY);
204 } else {
205 logger.info("Server url is {}", serverUrl);
206 }
207 systemUserName = bundleContext.getProperty(SecurityUtil.PROPERTY_KEY_SYS_USER);
208
209 @SuppressWarnings("rawtypes")
210 Dictionary properties = context.getProperties();
211 if (!StringUtils.isBlank((String) properties.get(LIVE_STREAMING_URL))) {
212 liveStreamingUrl = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAMING_URL));
213 logger.info("Live streaming server url is {}", liveStreamingUrl);
214 } else {
215 logger.info("Live streaming url not set in '{}'. Streaming urls must be provided by capture agent properties.",
216 LIVE_STREAMING_URL);
217 }
218
219 if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_NAME))) {
220 streamName = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_NAME));
221 } else {
222 streamName = DEFAULT_STREAM_NAME;
223 }
224
225 if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_MIME_TYPE))) {
226 streamMimeType = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_MIME_TYPE));
227 } else {
228 streamMimeType = DEFAULT_STREAM_MIME_TYPE;
229 }
230
231 String resolution = null;
232 if (!StringUtils.isBlank((String) properties.get(LIVE_STREAM_RESOLUTION))) {
233 resolution = StringUtils.trimToEmpty((String) properties.get(LIVE_STREAM_RESOLUTION));
234 } else {
235 resolution = DEFAULT_STREAM_RESOLUTION;
236 }
237 streamResolution = resolution.split(",");
238
239 String flavors = null;
240 if (!StringUtils.isBlank((String) properties.get(LIVE_TARGET_FLAVORS))) {
241 flavors = StringUtils.trimToEmpty((String) properties.get(LIVE_TARGET_FLAVORS));
242 } else {
243 flavors = DEFAULT_LIVE_TARGET_FLAVORS;
244 }
245 String[] flavorArray = StringUtils.split(flavors, ",");
246 liveFlavors = new MediaPackageElementFlavor[flavorArray.length];
247 int i = 0;
248 for (String f : flavorArray) {
249 liveFlavors[i++] = MediaPackageElementFlavor.parseFlavor(f);
250 }
251
252 publishedStreamingFormats = Arrays.asList(Optional.ofNullable(StringUtils.split(
253 (String)properties.get(LIVE_PUBLISH_STREAMING), ",")).orElse(new String[0]));
254
255 publishElementSelector = new SimpleElementSelector();
256 for (MediaPackageElementFlavor flavor : publishFlavors) {
257 publishElementSelector.addFlavor(flavor);
258 }
259
260 logger.info(
261 "Configured live stream name: {}, mime type: {}, resolution: {}, target flavors: {}",
262 streamName, streamMimeType, resolution, flavors);
263 }
264
265 @Override
266 public boolean createOrUpdateLiveEvent(String mpId, DublinCoreCatalog episodeDC) throws LiveScheduleException {
267 MediaPackage mp = getMediaPackageFromSearch(mpId);
268 if (mp == null) {
269
270
271 DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(episodeDC.getFirst(DublinCore.PROPERTY_TEMPORAL));
272 if (period.getEnd().getTime() <= System.currentTimeMillis()) {
273 logger.info("Live media package {} not created in search index because event is already past (end date: {})",
274 mpId, period.getEnd());
275 return false;
276 }
277 return createLiveEvent(mpId, episodeDC);
278 } else {
279
280
281 if (!mp.isLive()) {
282 logger.info("Media package {} is in search index but not live so not updating it.", mpId);
283 return false;
284 }
285 return updateLiveEvent(mp, episodeDC);
286 }
287 }
288
289 @Override
290 public boolean deleteLiveEvent(String mpId) throws LiveScheduleException {
291 MediaPackage mp = getMediaPackageFromSearch(mpId);
292 if (mp == null) {
293 logger.debug("Live media package {} not found in search index", mpId);
294 return false;
295 } else {
296 if (!mp.isLive()) {
297 logger.info("Media package {} is not live. Not retracting.", mpId);
298 return false;
299 }
300 return retractLiveEvent(mp);
301 }
302 }
303
304 @Override
305 public boolean updateLiveEventAcl(String mpId, AccessControlList acl) throws LiveScheduleException {
306 MediaPackage previousMp = getMediaPackageFromSearch(mpId);
307 if (previousMp != null) {
308 if (!previousMp.isLive()) {
309 logger.info("Media package {} is not live. Not updating acl.", mpId);
310 return false;
311 }
312
313 MediaPackage newMp = replaceAndDistributeAcl(previousMp, acl);
314
315 publishToSearch(newMp);
316
317 retractPreviousElements(previousMp, newMp);
318 logger.info("Updated live acl for media package {}", newMp);
319 return true;
320 }
321 return false;
322 }
323
324 boolean createLiveEvent(String mpId, DublinCoreCatalog episodeDC) throws LiveScheduleException {
325 try {
326 logger.info("Creating live media package {}", mpId);
327 Snapshot snapshot = getSnapshotFromArchive(mpId);
328
329
330 MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
331 setDurationForMediaPackage(tmpMp, episodeDC);
332 Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);
333
334
335 MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
336 for (Track t : tmpMp.getTracks()) {
337 mpForSearch.add(t);
338 }
339 publishToSearch(mpForSearch);
340
341
342 MediaPackage updatedArchivedMp = addLivePublicationToMediaPackage(snapshot, liveTracks);
343 snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
344 return true;
345 } catch (Exception e) {
346 throw new LiveScheduleException(e);
347 }
348 }
349
350 boolean updateLiveEvent(MediaPackage mpFromSearch, DublinCoreCatalog episodeDC) throws LiveScheduleException {
351 String mpId = mpFromSearch.getIdentifier().toString();
352 Snapshot snapshot = getSnapshotFromArchive(mpId);
353
354
355
356
357 if (snapshot.getVersion().equals(snapshotVersionCache.getIfPresent(mpId))) {
358 logger.debug("Snapshot version {} was created by us so this change is ignored.", snapshot.getVersion());
359 return false;
360 }
361
362
363 MediaPackage tmpMp = (MediaPackage) snapshot.getMediaPackage().clone();
364
365 Collection<MediaPackageElement> elements = publishElementSelector.select(tmpMp, false);
366 Arrays.stream(tmpMp.getElements()).filter(Predicate.not(elements::contains)).collect(Collectors.toList())
367 .forEach(tmpMp::remove);
368
369 setDurationForMediaPackage(tmpMp, episodeDC);
370 Map<String, Track> liveTracks = addLiveTracksToMediaPackage(tmpMp, episodeDC);
371
372
373 if (isSameMediaPackage(mpFromSearch, tmpMp)) {
374 logger.debug("Live media package {} seems to be the same. Not updating.", mpFromSearch);
375 return false;
376 }
377
378 logger.info("Updating live media package {}", mpFromSearch);
379
380
381 MediaPackage mpForSearch = distributeAclsAndCatalogs(snapshot);
382 for (Track t : tmpMp.getTracks()) {
383 mpForSearch.add(t);
384 }
385 removeLivePublicationChannel(mpForSearch);
386 publishToSearch(mpForSearch);
387 retractPreviousElements(mpFromSearch, mpForSearch);
388
389
390 MediaPackage updatedArchivedMp = updateLivePublication(snapshot.getMediaPackage(), liveTracks);
391 snapshotVersionCache.put(mpId, assetManager.takeSnapshot(updatedArchivedMp).getVersion());
392 return true;
393 }
394
395 private void createOrUpdatePublicationTracks(Publication publication, Map<String, Track> generatedTracks) {
396 if (publication.getTracks().length > 0) {
397 publication.clearTracks();
398 }
399
400 for (String publishedStreamingFormat : publishedStreamingFormats) {
401 Track track = generatedTracks.get(publishedStreamingFormat);
402 if (track != null) {
403 publication.addTrack(track);
404 }
405 }
406 }
407
408 private MediaPackage updateLivePublication(MediaPackage mediaPackage, Map<String, Track> generatedTracks) {
409 Publication[] publications = mediaPackage.getPublications();
410 for (Publication publication : publications) {
411 if (publication.getChannel().equals(CHANNEL_ID)) {
412 createOrUpdatePublicationTracks(publication, generatedTracks);
413 }
414 }
415 return mediaPackage;
416 }
417
418 boolean retractLiveEvent(MediaPackage mp) throws LiveScheduleException {
419 retract(mp);
420
421
422 try {
423 String mpId = mp.getIdentifier().toString();
424 Snapshot snapshot = getSnapshotFromArchive(mpId);
425 MediaPackage archivedMp = snapshot.getMediaPackage();
426 removeLivePublicationChannel(archivedMp);
427 logger.debug("Removed live pub channel from archived media package {}", mp);
428
429
430 snapshotVersionCache.put(mpId, assetManager.takeSnapshot(archivedMp).getVersion());
431 } catch (LiveScheduleException e) {
432
433 }
434 return true;
435 }
436
437 void publishToSearch(MediaPackage mp) throws LiveScheduleException {
438 try {
439
440 logger.info("Publishing LIVE media package {} to search index", mp);
441 Job publishJob = searchService.add(mp);
442 if (!waitForStatus(publishJob).isSuccess()) {
443 throw new LiveScheduleException("Live media package " + mp.getIdentifier() + " could not be published");
444 }
445 } catch (LiveScheduleException e) {
446 throw e;
447 } catch (Exception e) {
448 throw new LiveScheduleException(e);
449 }
450 }
451
452 void retract(MediaPackage mp) throws LiveScheduleException {
453 Organization org = securityService.getOrganization();
454 User prevUser = org != null ? securityService.getUser() : null;
455 try {
456 securityService.setUser(SecurityUtil.createSystemUser(systemUserName, org));
457 Set<String> elementIds = new HashSet<String>();
458 String mpId = mp.getIdentifier().toString();
459 logger.info("Removing LIVE media package {} from the search index", mpId);
460
461 for (MediaPackageElement mpe : mp.getElements()) {
462 if (!MediaPackageElement.Type.Publication.equals(mpe.getElementType())) {
463 elementIds.add(mpe.getIdentifier());
464 }
465 }
466
467 List<String> failedJobs = new ArrayList<>();
468
469 Job searchDeleteJob = searchService.delete(mpId);
470 if (!waitForStatus(searchDeleteJob).isSuccess()) {
471 failedJobs.add("Search Index");
472 }
473
474
475 Job distributionRetractJob = downloadDistributionService.retract(CHANNEL_ID, mp, elementIds);
476 if (!waitForStatus(distributionRetractJob).isSuccess()) {
477 failedJobs.add("Distribution");
478 }
479
480 if (!failedJobs.isEmpty()) {
481 throw new LiveScheduleException(
482 String.format("Removing live media package %s from %s failed", mpId, String.join(" and ", failedJobs)));
483 }
484 } catch (LiveScheduleException e) {
485 throw e;
486 } catch (Exception e) {
487 throw new LiveScheduleException(e);
488 } finally {
489 securityService.setUser(prevUser);
490 }
491 }
492
493
494
495
496
497
498
499
500
501
502 MediaPackage getMediaPackageFromSearch(String mediaPackageId) throws LiveScheduleException {
503
504 Organization org = securityService.getOrganization();
505 User prevUser = org != null ? securityService.getUser() : null;
506 securityService.setUser(SecurityUtil.createSystemUser(systemUserName, org));
507 try {
508
509 return searchService.get(mediaPackageId);
510 } catch (UnauthorizedException e) {
511 logger.warn("Unexpected unauthorized exception when querying the search index for mp {}", mediaPackageId, e);
512 return null;
513 } catch (NotFoundException e) {
514 return null;
515 } finally {
516 securityService.setUser(prevUser);
517 }
518 }
519
520 void setDurationForMediaPackage(MediaPackage mp, DublinCoreCatalog dc) {
521 DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(dc.getFirst(DublinCore.PROPERTY_TEMPORAL));
522 long duration = period.getEnd().getTime() - period.getStart().getTime();
523 mp.setDuration(duration);
524 logger.debug("Live media package {} has start {} and duration {}", mp.getIdentifier(), mp.getDate(),
525 mp.getDuration());
526 }
527
528 Map<String, Track> addLiveTracksToMediaPackage(MediaPackage mp, DublinCoreCatalog episodeDC)
529 throws LiveScheduleException {
530 String caName = episodeDC.getFirst(DublinCore.PROPERTY_SPATIAL);
531 HashMap<String, Track> generatedTracks = new HashMap<>();
532 String mpId = mp.getIdentifier().toString();
533 try {
534
535
536 try {
537 Properties caProps = captureAgentService.getAgentCapabilities(caName);
538 if (caProps != null) {
539 Enumeration<Object> en = caProps.keys();
540 while (en.hasMoreElements()) {
541 String key = (String) en.nextElement();
542 if (key.startsWith(CA_PROPERTY_RESOLUTION_URL_PREFIX)) {
543 String resolution = key.substring(CA_PROPERTY_RESOLUTION_URL_PREFIX.length());
544 String url = caProps.getProperty(key);
545
546 MediaPackageElementFlavor flavor = MediaPackageElementFlavor.parseFlavor(DEFAULT_LIVE_TARGET_FLAVORS);
547 String replacedUrl = replaceVariables(mpId, caName, url, flavor, resolution);
548 mp.add(buildStreamingTrack(replacedUrl, flavor, streamMimeType, resolution, mp.getDuration()));
549 }
550 }
551 }
552 } catch (NotFoundException e) {
553
554
555
556 }
557
558
559
560 if (mp.getTracks().length == 0) {
561 if (liveStreamingUrl == null) {
562 throw new LiveScheduleException(
563 "Cannot build live tracks because '" + LIVE_STREAMING_URL + "' configuration was not set.");
564 }
565
566 for (MediaPackageElementFlavor flavor : liveFlavors) {
567 for (int i = 0; i < streamResolution.length; i++) {
568 String uri = replaceVariables(mpId, caName, UrlSupport.concat(liveStreamingUrl.toString(), streamName),
569 flavor, streamResolution[i]);
570 Track track = buildStreamingTrack(uri, flavor, streamMimeType, streamResolution[i], mp.getDuration());
571 mp.add(track);
572 generatedTracks.put(flavor + ":" + streamResolution[i], track);
573 }
574 }
575 }
576 } catch (URISyntaxException e) {
577 throw new LiveScheduleException(e);
578 }
579 return generatedTracks;
580 }
581
582 Track buildStreamingTrack(String uriString, MediaPackageElementFlavor flavor, String mimeType, String resolution,
583 long duration) throws URISyntaxException {
584
585 URI uri = new URI(uriString);
586
587 MediaPackageElementBuilder elementBuilder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
588 MediaPackageElement element = elementBuilder.elementFromURI(uri, MediaPackageElement.Type.Track, flavor);
589 TrackImpl track = (TrackImpl) element;
590
591
592 track.setDuration(duration);
593 track.setLive(true);
594 track.setMimeType(MimeTypes.parseMimeType(mimeType));
595
596 VideoStreamImpl video = new VideoStreamImpl("video-" + flavor.getType() + "-" + flavor.getSubtype());
597
598 String[] dimensions = resolution.split("x");
599 video.setFrameWidth(Integer.parseInt(dimensions[0]));
600 video.setFrameHeight(Integer.parseInt(dimensions[1]));
601
602 track.addStream(video);
603
604 logger.debug("Creating live track element of flavor {}, resolution {}, and url {}",
605 new Object[] { flavor, resolution, uriString });
606
607 return track;
608 }
609
610
611
612
613
614 String replaceVariables(String mpId, String caName, String toBeReplaced, MediaPackageElementFlavor flavor,
615 String resolution) {
616
617
618 final Pattern pat = Pattern.compile("#\\{(\\w+)\\}");
619
620 Matcher matcher = pat.matcher(toBeReplaced);
621 StringBuffer sb = new StringBuffer();
622 while (matcher.find()) {
623 if (matcher.group(1).equals(REPLACE_ID)) {
624 matcher.appendReplacement(sb, mpId);
625 } else if (matcher.group(1).equals(REPLACE_FLAVOR)) {
626 matcher.appendReplacement(sb, flavor.getType() + "-" + flavor.getSubtype());
627 } else if (matcher.group(1).equals(REPLACE_CA_NAME)) {
628
629 matcher.appendReplacement(sb, caName);
630 } else if (matcher.group(1).equals(REPLACE_RESOLUTION)) {
631
632 matcher.appendReplacement(sb, resolution);
633 }
634 }
635 matcher.appendTail(sb);
636 return sb.toString();
637 }
638
639 private JobBarrier.Result waitForStatus(Job... jobs) throws IllegalStateException, IllegalArgumentException {
640 if (serviceRegistry == null) {
641 throw new IllegalStateException("Can't wait for job status without providing a service registry first");
642 }
643 JobBarrier barrier = new JobBarrier(null, serviceRegistry, jobPollingInterval, jobs);
644 return barrier.waitForJobs();
645 }
646
647 Snapshot getSnapshotFromArchive(String mpId) throws LiveScheduleException {
648 AQueryBuilder query = assetManager.createQuery();
649 AResult result = query.select(query.snapshot()).where(query.mediaPackageId(mpId).and(query.version().isLatest()))
650 .run();
651 if (result.getSize() == 0) {
652
653 throw new LiveScheduleException(String.format("Unexpected error: media package %s has not been archived.", mpId));
654 }
655 Optional<ARecord> record = result.getRecords().stream().findFirst();
656 if (record.isEmpty()) {
657
658 throw new LiveScheduleException(String.format("Unexpected error: media package %s has not been archived.", mpId));
659 }
660 return record.get().getSnapshot().get();
661 }
662
663 MediaPackage distributeAclsAndCatalogs(Snapshot snapshot) throws LiveScheduleException {
664 try {
665 MediaPackage mp = (MediaPackage) snapshot.getMediaPackage().clone();
666
667
668 Collection<MediaPackageElement> elements = publishElementSelector.select(mp, false);
669 Set<String> elementIds = elements.stream().map(MediaPackageElement::getIdentifier).collect(Collectors.toSet());
670
671
672 Job distributionJob = downloadDistributionService.distribute(CHANNEL_ID, mp, elementIds, false);
673 if (!waitForStatus(distributionJob).isSuccess()) {
674 throw new LiveScheduleException(
675 "Element(s) for live media package " + mp.getIdentifier() + " could not be distributed");
676 }
677
678
679 for (MediaPackageElement e: mp.getElements()) {
680 mp.remove(e);
681 }
682
683
684 List<MediaPackageElement> distributedElements = (List<MediaPackageElement>) MediaPackageElementParser
685 .getArrayFromXml(distributionJob.getPayload());
686 for (MediaPackageElement distributedElement : distributedElements) {
687 mp.add(distributedElement);
688 }
689
690
691 for (String id : elementIds) {
692 MediaPackageElement e = mp.getElementById(id);
693 workspace.delete(e.getURI());
694 }
695
696 return mp;
697 } catch (LiveScheduleException e) {
698 throw e;
699 } catch (Exception e) {
700 throw new LiveScheduleException(e);
701 }
702 }
703
704 MediaPackage replaceAndDistributeAcl(MediaPackage previousMp, AccessControlList acl) throws LiveScheduleException {
705 try {
706
707 MediaPackage mp = (MediaPackage) previousMp.clone();
708
709
710 Attachment[] atts = mp.getAttachments(MediaPackageElements.XACML_POLICY_EPISODE);
711 if (atts.length > 0) {
712 mp.remove(atts[0]);
713 }
714
715
716 authService.setAcl(mp, AclScope.Episode, acl);
717 atts = mp.getAttachments(MediaPackageElements.XACML_POLICY_EPISODE);
718 if (atts.length > 0) {
719 String aclId = atts[0].getIdentifier();
720
721 Job distributionJob = downloadDistributionService.distribute(CHANNEL_ID, mp, aclId, false);
722 if (!waitForStatus(distributionJob).isSuccess()) {
723 throw new LiveScheduleException(
724 "Acl for live media package " + mp.getIdentifier() + " could not be distributed");
725 }
726
727 MediaPackageElement e = mp.getElementById(aclId);
728
729 mp.remove(e);
730 workspace.delete(e.getURI());
731
732
733 mp.add(MediaPackageElementParser.getFromXml(distributionJob.getPayload()));
734 }
735 return mp;
736 } catch (LiveScheduleException e) {
737 throw e;
738 } catch (Exception e) {
739 throw new LiveScheduleException(e);
740 }
741 }
742
743 MediaPackage addLivePublicationToMediaPackage(Snapshot snapshot, Map<String, Track> generatedTracks)
744 throws LiveScheduleException {
745 MediaPackage mp = snapshot.getMediaPackage();
746
747 Organization currentOrg = null;
748 try {
749 currentOrg = organizationService.getOrganization(snapshot.getOrganizationId());
750 } catch (NotFoundException e) {
751 logger.warn("Organization in snapshot not found: {}", snapshot.getOrganizationId());
752 }
753
754 logger.debug("Adding live channel publication element to media package {}", mp);
755 String engageUrlString = null;
756 if (currentOrg != null) {
757 engageUrlString = StringUtils.trimToNull(currentOrg.getProperties().get(ENGAGE_URL_PROPERTY));
758 }
759 if (engageUrlString == null) {
760 engageUrlString = serverUrl;
761 logger.info(
762 "Using 'server.url' as a fallback for the non-existing organization level key '{}' for the publication url",
763 ENGAGE_URL_PROPERTY);
764 }
765
766 try {
767
768 URI engageUri = URIUtils.resolve(new URI(engageUrlString), PLAYER_PATH + mp.getIdentifier().toString());
769 Publication publicationElement = PublicationImpl.publication(UUID.randomUUID().toString(), CHANNEL_ID, engageUri,
770 MimeTypes.parseMimeType("text/html"));
771 mp.add(publicationElement);
772 createOrUpdatePublicationTracks(publicationElement, generatedTracks);
773 return mp;
774 } catch (URISyntaxException e) {
775 throw new LiveScheduleException(e);
776 }
777 }
778
779 void removeLivePublicationChannel(MediaPackage mp) {
780
781 Publication[] publications = mp.getPublications();
782 if (publications != null) {
783 for (Publication publication : publications) {
784 if (CHANNEL_ID.equals(publication.getChannel())) {
785 mp.remove(publication);
786 }
787 }
788 }
789 }
790
791 boolean isSameMediaPackage(MediaPackage previous, MediaPackage current) {
792
793 Equator<Track> liveTrackEquator = new Equator<>() {
794 @Override
795 public boolean equate(Track track1, Track track2) {
796
797 VideoStream videostream1 = (VideoStream) track1.getStreams()[0];
798 VideoStream videostream2 = (VideoStream) track2.getStreams()[0];
799
800 return Objects.equals(track1.getURI(), track2.getURI())
801 && Objects.equals(track1.getFlavor(), track2.getFlavor())
802 && Objects.equals(track1.getMimeType(), track2.getMimeType())
803 && Objects.equals(track1.getDuration(), track2.getDuration())
804 && Objects.equals(videostream1.getFrameWidth(), videostream2.getFrameWidth())
805 && Objects.equals(videostream1.getFrameHeight(), videostream2.getFrameHeight());
806 }
807
808 @Override
809 public int hash(Track track) {
810 VideoStream videostream = (VideoStream) track.getStreams()[0];
811 return Objects.hash(track.getURI(), track.getFlavor(), track.getMimeType(), track.getDuration(),
812 videostream.getFrameWidth(), videostream.getFrameHeight());
813 }
814 };
815
816 Equator<MediaPackageElement> catalogAndAttachmentEquator = new Equator<>() {
817 @Override
818 public boolean equate(MediaPackageElement mpe1, MediaPackageElement mpe2) {
819 return Objects.equals(mpe1.getIdentifier(), mpe2.getIdentifier())
820 && Objects.equals(mpe1.getElementType(), mpe2.getElementType())
821 && Objects.equals(mpe1.getChecksum(), mpe2.getChecksum())
822 && Objects.equals(mpe1.getFlavor(), mpe2.getFlavor());
823 }
824
825 @Override
826 public int hash(MediaPackageElement mpe) {
827 return Objects.hash(mpe.getIdentifier(), mpe.getElementType(), mpe.getChecksum(), mpe.getFlavor());
828 }
829 };
830
831 if (!CollectionUtils.isEqualCollection(Arrays.asList(previous.getTracks()), Arrays.asList(current.getTracks()),
832 liveTrackEquator)) {
833 return false;
834 } else if (!CollectionUtils.isEqualCollection(Arrays.asList(previous.getCatalogs()),
835 Arrays.asList(current.getCatalogs()), catalogAndAttachmentEquator)) {
836 return false;
837 } else {
838 return CollectionUtils.isEqualCollection(Arrays.asList(previous.getAttachments()),
839 Arrays.asList(current.getAttachments()), catalogAndAttachmentEquator);
840 }
841 }
842
843 void retractPreviousElements(MediaPackage previousMp, MediaPackage newMp) throws LiveScheduleException {
844 try {
845
846
847 Set<String> elementIds = new HashSet<String>();
848 for (MediaPackageElement element : previousMp.getElements()) {
849
850 if (!Track.TYPE.equals(element.getElementType())) {
851 boolean canBeDeleted = true;
852 for (MediaPackageElement newElement : newMp.getElements()) {
853 if (element.getURI().equals(newElement.getURI())) {
854 logger.debug(
855 "Not retracting element {} with URI {} from download distribution because it is "
856 + "still used by updated live media package",
857 element.getIdentifier(), element.getURI());
858 canBeDeleted = false;
859 break;
860 }
861 }
862 if (canBeDeleted) {
863 elementIds.add(element.getIdentifier());
864 }
865 }
866 }
867 if (elementIds.size() > 0) {
868 Job job = downloadDistributionService.retract(CHANNEL_ID, previousMp, elementIds);
869
870 if (!waitForStatus(job).isSuccess()) {
871 logger.warn("One of the download retract jobs did not complete successfully");
872 } else {
873 logger.debug("Retraction of previously published elements complete");
874 }
875 }
876 } catch (DistributionException e) {
877 throw new LiveScheduleException(e);
878 }
879 }
880
881 @Reference
882 public void setDublinCoreService(DublinCoreCatalogService service) {
883 this.dublinCoreService = service;
884 }
885
886 @Reference
887 public void setSearchService(SearchService service) {
888 this.searchService = service;
889 }
890
891 @Reference
892 public void setSeriesService(SeriesService service) {
893 this.seriesService = service;
894 }
895
896 @Reference
897 public void setServiceRegistry(ServiceRegistry service) {
898 this.serviceRegistry = service;
899 }
900
901 @Reference
902 public void setCaptureAgentService(CaptureAgentStateService service) {
903 this.captureAgentService = service;
904 }
905
906 @Reference(
907 name = "DownloadDistributionService",
908 target = "(distribution.channel=download)"
909 )
910 public void setDownloadDistributionService(DownloadDistributionService service) {
911 this.downloadDistributionService = service;
912 logger.info("Distribution service with type '{}' set.", downloadDistributionService.getDistributionType());
913 }
914
915 @Reference
916 public void setWorkspace(Workspace ws) {
917 this.workspace = ws;
918 }
919
920 @Reference
921 public void setAssetManager(AssetManager assetManager) {
922 this.assetManager = assetManager;
923 }
924
925 @Reference
926 public void setAuthorizationService(AuthorizationService service) {
927 this.authService = service;
928 }
929
930 @Reference
931 public void setOrganizationService(OrganizationDirectoryService service) {
932 this.organizationService = service;
933 }
934
935 @Reference
936 public void setSecurityService(SecurityService service) {
937 this.securityService = service;
938 }
939
940
941
942 void setJobPollingInterval(long jobPollingInterval) {
943 this.jobPollingInterval = jobPollingInterval;
944 }
945
946 Cache<String, Version> getSnapshotVersionCache() {
947 return this.snapshotVersionCache;
948 }
949
950 }