1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.handler.distribution;
23
24 import static org.apache.commons.lang3.StringUtils.isBlank;
25 import static org.opencastproject.systems.OpencastConstants.SERVER_URL_PROPERTY;
26 import static org.opencastproject.util.data.Option.option;
27 import static org.opencastproject.util.data.functions.Strings.toBool;
28 import static org.opencastproject.util.data.functions.Strings.trimToNone;
29 import static org.opencastproject.workflow.handler.distribution.EngagePublicationChannel.CHANNEL_ID;
30
31 import org.opencastproject.distribution.api.DistributionException;
32 import org.opencastproject.distribution.api.DownloadDistributionService;
33 import org.opencastproject.distribution.api.StreamingDistributionService;
34 import org.opencastproject.job.api.Job;
35 import org.opencastproject.job.api.JobContext;
36 import org.opencastproject.mediapackage.Attachment;
37 import org.opencastproject.mediapackage.Catalog;
38 import org.opencastproject.mediapackage.MediaPackage;
39 import org.opencastproject.mediapackage.MediaPackageElement;
40 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
41 import org.opencastproject.mediapackage.MediaPackageElementParser;
42 import org.opencastproject.mediapackage.MediaPackageElements;
43 import org.opencastproject.mediapackage.MediaPackageException;
44 import org.opencastproject.mediapackage.MediaPackageReference;
45 import org.opencastproject.mediapackage.MediaPackageReferenceImpl;
46 import org.opencastproject.mediapackage.Publication;
47 import org.opencastproject.mediapackage.PublicationImpl;
48 import org.opencastproject.mediapackage.Track;
49 import org.opencastproject.mediapackage.selector.SimpleElementSelector;
50 import org.opencastproject.mediapackage.track.TrackImpl;
51 import org.opencastproject.metadata.dublincore.DublinCore;
52 import org.opencastproject.metadata.dublincore.DublinCoreValue;
53 import org.opencastproject.metadata.dublincore.DublinCoreXmlFormat;
54 import org.opencastproject.search.api.SearchException;
55 import org.opencastproject.search.api.SearchService;
56 import org.opencastproject.security.api.Organization;
57 import org.opencastproject.security.api.OrganizationDirectoryService;
58 import org.opencastproject.security.api.UnauthorizedException;
59 import org.opencastproject.serviceregistry.api.ServiceRegistry;
60 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
61 import org.opencastproject.util.MimeTypes;
62 import org.opencastproject.util.NotFoundException;
63 import org.opencastproject.util.UrlSupport;
64 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
65 import org.opencastproject.workflow.api.WorkflowInstance;
66 import org.opencastproject.workflow.api.WorkflowOperationException;
67 import org.opencastproject.workflow.api.WorkflowOperationHandler;
68 import org.opencastproject.workflow.api.WorkflowOperationInstance;
69 import org.opencastproject.workflow.api.WorkflowOperationResult;
70 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
71 import org.opencastproject.workspace.api.Workspace;
72
73 import org.apache.commons.lang3.StringUtils;
74 import org.apache.http.client.utils.URIUtils;
75 import org.osgi.framework.BundleContext;
76 import org.osgi.service.component.ComponentContext;
77 import org.osgi.service.component.annotations.Activate;
78 import org.osgi.service.component.annotations.Component;
79 import org.osgi.service.component.annotations.Reference;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
82
83 import java.net.MalformedURLException;
84 import java.net.URI;
85 import java.net.URL;
86 import java.util.ArrayList;
87 import java.util.Arrays;
88 import java.util.Collection;
89 import java.util.HashMap;
90 import java.util.HashSet;
91 import java.util.List;
92 import java.util.Map;
93 import java.util.Optional;
94 import java.util.Set;
95 import java.util.UUID;
96 import java.util.stream.Collectors;
97
98
99
100
101
102 @Component(
103 immediate = true,
104 service = WorkflowOperationHandler.class,
105 property = {
106 "service.description=Engage Publication Workflow Handler",
107 "workflow.operation=publish-engage"
108 }
109 )
110 public class PublishEngageWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
111
112
113 private static final Logger logger = LoggerFactory.getLogger(PublishEngageWorkflowOperationHandler.class);
114
115
116 static final String ENGAGE_URL_PROPERTY = "org.opencastproject.engage.ui.url";
117 static final String STREAMING_PUBLISH_PROPERTY = "org.opencastproject.publish.streaming.formats";
118
119
120 static final String DOWNLOAD_SOURCE_FLAVORS = "download-source-flavors";
121 static final String DOWNLOAD_TARGET_SUBFLAVOR = "download-target-subflavor";
122 static final String DOWNLOAD_SOURCE_TAGS = "download-source-tags";
123 static final String DOWNLOAD_TARGET_TAGS = "download-target-tags";
124 static final String STREAMING_SOURCE_TAGS = "streaming-source-tags";
125 static final String STREAMING_TARGET_TAGS = "streaming-target-tags";
126 static final String STREAMING_SOURCE_FLAVORS = "streaming-source-flavors";
127 static final String STREAMING_TARGET_SUBFLAVOR = "streaming-target-subflavor";
128 static final String CHECK_AVAILABILITY = "check-availability";
129 static final String STRATEGY = "strategy";
130 static final String MERGE_FORCE_FLAVORS = "merge-force-flavors";
131 static final String ADD_FORCE_FLAVORS = "add-force-flavors";
132
133 private static final String MERGE_FORCE_FLAVORS_DEFAULT = "dublincore/*,security/*";
134 private static final String ADD_FORCE_FLAVORS_DEFAULT = "";
135
136
137 static final String PLAYER_PATH = "/play/";
138
139
140 static final String PUBLISH_STRATEGY_MERGE = "merge";
141
142
143 static final String PUBLISH_STRATEGY_DEFAULT = "default";
144
145
146 private StreamingDistributionService streamingDistributionService = null;
147
148
149 private DownloadDistributionService downloadDistributionService = null;
150
151
152 private SearchService searchService = null;
153
154 private Workspace workspace;
155
156
157 private URL serverUrl;
158
159 private OrganizationDirectoryService organizationDirectoryService = null;
160
161
162 private List<String> publishedStreamingFormats = null;
163
164
165
166
167
168
169
170 @Reference(target = "(distribution.channel=streaming)")
171 protected void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
172 this.streamingDistributionService = streamingDistributionService;
173 }
174
175
176
177
178
179
180
181 @Reference(target = "(distribution.channel=download)")
182 protected void setDownloadDistributionService(DownloadDistributionService downloadDistributionService) {
183 this.downloadDistributionService = downloadDistributionService;
184 }
185
186
187
188
189
190
191
192
193 @Reference
194 protected void setSearchService(SearchService searchService) {
195 this.searchService = searchService;
196 }
197
198 @Reference
199 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
200 this.organizationDirectoryService = organizationDirectoryService;
201 }
202
203 @Reference
204 @Override
205 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
206 super.setServiceRegistry(serviceRegistry);
207 }
208
209 @Reference
210 public void setWorkspace(Workspace workspace) {
211 this.workspace = workspace;
212 }
213
214
215
216 private static final Set<TrackImpl.StreamingProtocol> STREAMING_FORMATS = new HashSet<>(Arrays.asList(
217 TrackImpl.StreamingProtocol.RTMP,
218 TrackImpl.StreamingProtocol.RTMPE,
219 TrackImpl.StreamingProtocol.HLS,
220 TrackImpl.StreamingProtocol.DASH,
221 TrackImpl.StreamingProtocol.HDS,
222 TrackImpl.StreamingProtocol.SMOOTH));
223
224 @Override
225 @Activate
226 protected void activate(ComponentContext cc) {
227 super.activate(cc);
228 BundleContext bundleContext = cc.getBundleContext();
229
230
231 serverUrl = UrlSupport.url(bundleContext.getProperty(SERVER_URL_PROPERTY));
232 publishedStreamingFormats = Arrays.asList(Optional.ofNullable(StringUtils.split(
233 bundleContext.getProperty(STREAMING_PUBLISH_PROPERTY), ",")).orElse(new String[0]));
234 }
235
236 @Override
237 public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
238 throws WorkflowOperationException {
239 logger.debug("Running engage publication workflow operation");
240
241 MediaPackage mediaPackage = workflowInstance.getMediaPackage();
242 WorkflowOperationInstance op = workflowInstance.getCurrentOperation();
243
244
245 String downloadSourceTags = StringUtils.trimToEmpty(op.getConfiguration(DOWNLOAD_SOURCE_TAGS));
246 String downloadTargetTags = StringUtils.trimToEmpty(op.getConfiguration(DOWNLOAD_TARGET_TAGS));
247 String downloadSourceFlavors = StringUtils.trimToEmpty(op.getConfiguration(DOWNLOAD_SOURCE_FLAVORS));
248 String downloadTargetSubflavor = StringUtils.trimToNull(op.getConfiguration(DOWNLOAD_TARGET_SUBFLAVOR));
249 String streamingSourceTags = StringUtils.trimToEmpty(op.getConfiguration(STREAMING_SOURCE_TAGS));
250 String streamingTargetTags = StringUtils.trimToEmpty(op.getConfiguration(STREAMING_TARGET_TAGS));
251 String streamingSourceFlavors = StringUtils.trimToEmpty(op.getConfiguration(STREAMING_SOURCE_FLAVORS));
252 String streamingTargetSubflavor = StringUtils.trimToNull(op.getConfiguration(STREAMING_TARGET_SUBFLAVOR));
253 String republishStrategy = StringUtils.trimToEmpty(
254 StringUtils.defaultString(op.getConfiguration(STRATEGY), PUBLISH_STRATEGY_DEFAULT));
255 String mergeForceFlavorsStr = StringUtils.trimToEmpty(
256 StringUtils.defaultString(op.getConfiguration(MERGE_FORCE_FLAVORS), MERGE_FORCE_FLAVORS_DEFAULT));
257 String addForceFlavorsStr = StringUtils.trimToEmpty(
258 StringUtils.defaultString(op.getConfiguration(ADD_FORCE_FLAVORS), ADD_FORCE_FLAVORS_DEFAULT));
259
260
261 boolean checkAvailability = option(op.getConfiguration(CHECK_AVAILABILITY)).bind(trimToNone).map(toBool)
262 .getOrElse(true);
263
264
265
266 MediaPackage distributedMp = null;
267 try {
268 distributedMp = searchService.get(mediaPackage.getIdentifier().toString());
269 } catch (NotFoundException e) {
270 logger.debug("No published mediapackage found for {}", mediaPackage.getIdentifier().toString());
271 } catch (UnauthorizedException e) {
272 throw new WorkflowOperationException("Unauthorized for " + mediaPackage.getIdentifier().toString(), e);
273 }
274 if (PUBLISH_STRATEGY_MERGE.equals(republishStrategy) && distributedMp == null) {
275 logger.info("Skipping republish for {} since it is not currently published",
276 mediaPackage.getIdentifier().toString());
277 return createResult(mediaPackage, Action.SKIP);
278 }
279
280 String[] sourceDownloadTags = StringUtils.split(downloadSourceTags, ",");
281 String[] targetDownloadTags = StringUtils.split(downloadTargetTags, ",");
282 String[] sourceDownloadFlavors = StringUtils.split(downloadSourceFlavors, ",");
283 String[] sourceStreamingTags = StringUtils.split(streamingSourceTags, ",");
284 String[] targetStreamingTags = StringUtils.split(streamingTargetTags, ",");
285 String[] sourceStreamingFlavors = StringUtils.split(streamingSourceFlavors, ",");
286
287 if (sourceDownloadTags.length == 0 && sourceDownloadFlavors.length == 0 && sourceStreamingTags.length == 0
288 && sourceStreamingFlavors.length == 0) {
289 logger.warn("No tags or flavors have been specified, so nothing will be published to the "
290 + "engage publication channel");
291 return createResult(mediaPackage, Action.CONTINUE);
292 }
293
294
295 List<MediaPackageElementFlavor> mergeForceFlavors = Arrays.stream(StringUtils.split(mergeForceFlavorsStr, ", "))
296 .map(MediaPackageElementFlavor::parseFlavor).collect(Collectors.toList());
297 List<MediaPackageElementFlavor> addForceFlavors = Arrays.stream(StringUtils.split(addForceFlavorsStr, ", "))
298 .map(MediaPackageElementFlavor::parseFlavor).collect(Collectors.toList());
299
300
301 MediaPackageElementFlavor downloadSubflavor = null;
302 if (downloadTargetSubflavor != null) {
303 try {
304 downloadSubflavor = MediaPackageElementFlavor.parseFlavor(downloadTargetSubflavor);
305 } catch (IllegalArgumentException e) {
306 throw new WorkflowOperationException(e);
307 }
308 }
309
310
311 MediaPackageElementFlavor streamingSubflavor = null;
312 if (streamingTargetSubflavor != null) {
313 try {
314 streamingSubflavor = MediaPackageElementFlavor.parseFlavor(streamingTargetSubflavor);
315 } catch (IllegalArgumentException e) {
316 throw new WorkflowOperationException(e);
317 }
318 }
319
320
321 SimpleElementSelector downloadElementSelector = new SimpleElementSelector();
322 for (String flavor : sourceDownloadFlavors) {
323 downloadElementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
324 }
325 for (String tag : sourceDownloadTags) {
326 downloadElementSelector.addTag(tag);
327 }
328
329
330 SimpleElementSelector streamingElementSelector = new SimpleElementSelector();
331 for (String flavor : sourceStreamingFlavors) {
332 streamingElementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
333 }
334 for (String tag : sourceStreamingTags) {
335 streamingElementSelector.addTag(tag);
336 }
337
338
339 Collection<MediaPackageElement> downloadElements = downloadElementSelector.select(mediaPackage, false);
340 Collection<MediaPackageElement> streamingElements = streamingElementSelector.select(mediaPackage, false);
341
342 try {
343 Set<String> downloadElementIds = new HashSet<String>();
344 Set<String> streamingElementIds = new HashSet<String>();
345
346
347 for (MediaPackageElement elem : downloadElements) {
348 downloadElementIds.add(elem.getIdentifier());
349 }
350 for (MediaPackageElement elem : streamingElements) {
351 streamingElementIds.add(elem.getIdentifier());
352 }
353
354 removePublicationElement(mediaPackage);
355 if (republishStrategy.equals(PUBLISH_STRATEGY_DEFAULT)) {
356 retractFromEngage(distributedMp);
357 }
358
359 List<Job> jobs = new ArrayList<Job>();
360
361 try {
362 if (downloadElementIds.size() > 0) {
363 Job job = downloadDistributionService.distribute(
364 CHANNEL_ID, mediaPackage, downloadElementIds, checkAvailability);
365 if (job != null) {
366 jobs.add(job);
367 }
368 }
369
370 if (streamingDistributionService != null && streamingDistributionService.publishToStreaming()) {
371 for (String elementId : streamingElementIds) {
372 Job job = streamingDistributionService.distribute(CHANNEL_ID, mediaPackage, elementId);
373 if (job != null) {
374 jobs.add(job);
375 }
376 }
377 }
378 } catch (DistributionException e) {
379 throw new WorkflowOperationException(e);
380 }
381
382 if (jobs.size() < 1) {
383 logger.info("No mediapackage element was found for distribution to engage");
384 return createResult(mediaPackage, Action.CONTINUE);
385 }
386
387
388 if (!waitForStatus(jobs.toArray(new Job[jobs.size()])).isSuccess()) {
389 throw new WorkflowOperationException("One of the distribution jobs did not complete successfully");
390 }
391
392 logger.debug("Distribute of mediapackage {} completed", mediaPackage);
393
394 String engageUrlString = null;
395 try {
396 MediaPackage mediaPackageForSearch = getMediaPackageForSearchIndex(mediaPackage, jobs, downloadSubflavor,
397 targetDownloadTags, downloadElementIds, streamingSubflavor, streamingElementIds, targetStreamingTags);
398
399
400 removePublicationElement(mediaPackage);
401 if (republishStrategy.equals(PUBLISH_STRATEGY_MERGE)) {
402 mediaPackageForSearch = mergePackages(mediaPackageForSearch, distributedMp, mergeForceFlavors,
403 addForceFlavors);
404 }
405
406 if (StringUtils.isBlank(mediaPackageForSearch.getTitle())) {
407 var dcUri = Arrays.stream(mediaPackageForSearch.getCatalogs(MediaPackageElements.EPISODE))
408 .findFirst()
409 .map(MediaPackageElement::getURI);
410 if (dcUri.isPresent()) {
411 try (var in = workspace.read(dcUri.get())) {
412 DublinCoreXmlFormat.read(in)
413 .get(DublinCore.PROPERTY_TITLE)
414 .stream()
415 .findFirst()
416 .map(DublinCoreValue::getValue)
417 .ifPresent(mediaPackageForSearch::setTitle);
418 }
419 }
420 }
421
422
423 if (isBlank(mediaPackageForSearch.getTitle())) {
424 throw new WorkflowOperationException("Media package does not meet publication criteria: Missing title");
425 }
426 if (!mediaPackageForSearch.hasTracks()) {
427 throw new WorkflowOperationException("Media package does not meet publication criteria: No tracks selected");
428 }
429
430
431 MediaPackageElement[] mediaPackageElements = mediaPackageForSearch.getElements();
432
433 logger.info("Publishing media package {} to search index", mediaPackageForSearch);
434
435 URL engageBaseUrl;
436 Organization organization = organizationDirectoryService.getOrganization(workflowInstance.getOrganizationId());
437 engageUrlString = StringUtils.trimToNull(organization.getProperties().get(ENGAGE_URL_PROPERTY));
438 if (engageUrlString != null) {
439 engageBaseUrl = new URL(engageUrlString);
440 } else {
441 engageBaseUrl = serverUrl;
442 logger.info(
443 "Using 'server.url' as a fallback for the non-existing organization level key '{}' "
444 + "for the publication url",
445 ENGAGE_URL_PROPERTY);
446 }
447
448
449 URI engageUri = this.createEngageUri(engageBaseUrl.toURI(), mediaPackage);
450
451
452 Publication publicationElement = PublicationImpl.publication(UUID.randomUUID().toString(), CHANNEL_ID,
453 engageUri, MimeTypes.parseMimeType("text/html"));
454
455
456 for (MediaPackageElement element : mediaPackageElements) {
457 element.setIdentifier(null);
458 PublicationImpl.addElementToPublication(publicationElement, element);
459 }
460
461 mediaPackage.add(publicationElement);
462
463
464 if (streamingDistributionService != null
465 && streamingDistributionService.publishToStreaming()
466 && !publishedStreamingFormats.isEmpty()) {
467 for (Track track : mediaPackageForSearch.getTracks()) {
468 String mimeType = track.getMimeType().toString();
469 if (isStreamingFormat(track) && (publishedStreamingFormats.contains(mimeType)
470 || publishedStreamingFormats.contains("*"))) {
471 publicationElement.addTrack(track);
472 }
473 }
474 for (Attachment attachment : mediaPackageForSearch.getAttachments()) {
475 publicationElement.addAttachment(attachment);
476 }
477 for (Catalog catalog : mediaPackageForSearch.getCatalogs()) {
478 publicationElement.addCatalog(catalog);
479 }
480 }
481
482
483 Job publishJob = null;
484 try {
485 publishJob = searchService.add(mediaPackageForSearch);
486 if (!waitForStatus(publishJob).isSuccess()) {
487 throw new WorkflowOperationException("Mediapackage " + mediaPackageForSearch.getIdentifier()
488 + " could not be published");
489 }
490 } catch (SearchException e) {
491 throw new WorkflowOperationException("Error publishing media package", e);
492 } catch (MediaPackageException e) {
493 throw new WorkflowOperationException("Error parsing media package", e);
494 }
495
496 logger.debug("Publishing of mediapackage {} completed", mediaPackage);
497 return createResult(mediaPackage, Action.CONTINUE);
498 } catch (MalformedURLException e) {
499 logger.error("{} is malformed: {}", ENGAGE_URL_PROPERTY, engageUrlString);
500 throw new WorkflowOperationException(e);
501 } catch (Throwable t) {
502 if (t instanceof WorkflowOperationException) {
503 throw (WorkflowOperationException) t;
504 } else {
505 throw new WorkflowOperationException(t);
506 }
507 }
508 } catch (Exception e) {
509 if (e instanceof WorkflowOperationException) {
510 throw (WorkflowOperationException) e;
511 } else {
512 throw new WorkflowOperationException(e);
513 }
514 }
515 }
516
517
518
519
520
521
522
523
524 URI createEngageUri(URI engageUri, MediaPackage mp) {
525 return URIUtils.resolve(engageUri, PLAYER_PATH + mp.getIdentifier().toString());
526 }
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549 protected MediaPackage getMediaPackageForSearchIndex(MediaPackage current, List<Job> jobs,
550 MediaPackageElementFlavor downloadSubflavor, String[] downloadTargetTags, Set<String> downloadElementIds,
551 MediaPackageElementFlavor streamingSubflavor, Set<String> streamingElementIds, String[] streamingTargetTags)
552 throws MediaPackageException, NotFoundException, ServiceRegistryException, WorkflowOperationException {
553 MediaPackage mp = (MediaPackage) current.clone();
554
555
556 List<String> elementsToPublish = new ArrayList<String>();
557 Map<String, String> distributedElementIds = new HashMap<String, String>();
558
559 for (Job entry : jobs) {
560 Job job = serviceRegistry.getJob(entry.getId());
561
562
563 if (job.getPayload() == null) {
564 continue;
565 }
566
567 List<? extends MediaPackageElement> distributedElements = null;
568 try {
569 distributedElements = MediaPackageElementParser.getArrayFromXml(job.getPayload());
570 } catch (MediaPackageException e) {
571 throw new WorkflowOperationException(e);
572 }
573
574
575
576 if (distributedElements == null || distributedElements.size() < 1) {
577 continue;
578 }
579
580 for (MediaPackageElement distributedElement : distributedElements) {
581
582 String sourceElementId = distributedElement.getIdentifier();
583 if (sourceElementId != null) {
584 MediaPackageElement sourceElement = mp.getElementById(sourceElementId);
585
586
587 distributedElement.setIdentifier(null);
588 if (sourceElement != null) {
589
590 if (downloadElementIds.contains(sourceElementId)) {
591 if (downloadSubflavor != null) {
592 MediaPackageElementFlavor flavor = sourceElement.getFlavor();
593 if (flavor != null) {
594 MediaPackageElementFlavor newFlavor = new MediaPackageElementFlavor(flavor.getType(),
595 downloadSubflavor.getSubtype());
596 distributedElement.setFlavor(newFlavor);
597 }
598 }
599 }
600
601 else if (streamingElementIds.contains(sourceElementId)) {
602 if (streamingSubflavor != null && streamingElementIds.contains(sourceElementId)) {
603 MediaPackageElementFlavor flavor = sourceElement.getFlavor();
604 if (flavor != null) {
605 MediaPackageElementFlavor newFlavor = new MediaPackageElementFlavor(flavor.getType(),
606 streamingSubflavor.getSubtype());
607 distributedElement.setFlavor(newFlavor);
608 }
609 }
610 }
611
612 MediaPackageReference ref = sourceElement.getReference();
613 if (ref != null && mp.getElementByReference(ref) != null) {
614 MediaPackageReference newReference = (MediaPackageReference) ref.clone();
615 distributedElement.setReference(newReference);
616 }
617 }
618 }
619
620 if (isStreamingFormat(distributedElement)) {
621 applyTags(distributedElement, streamingTargetTags);
622 } else {
623 applyTags(distributedElement, downloadTargetTags);
624 }
625
626
627 mp.add(distributedElement);
628 elementsToPublish.add(distributedElement.getIdentifier());
629 distributedElementIds.put(sourceElementId, distributedElement.getIdentifier());
630 }
631 }
632
633
634 List<MediaPackageElement> removals = new ArrayList<MediaPackageElement>();
635 for (MediaPackageElement element : mp.getElements()) {
636 if (!elementsToPublish.contains(element.getIdentifier())) {
637 removals.add(element);
638 }
639 }
640
641
642 for (MediaPackageElement element : mp.getElements()) {
643
644 if (removals.contains(element)) {
645 continue;
646 }
647
648
649 MediaPackageReference reference = element.getReference();
650 if (reference == null) {
651 continue;
652 }
653
654
655 String distributedElementId = distributedElementIds.get(reference.getIdentifier());
656 if (distributedElementId == null) {
657 continue;
658 }
659
660 MediaPackageReference translatedReference
661 = new MediaPackageReferenceImpl(mp.getElementById(distributedElementId));
662 if (reference.getProperties() != null) {
663 translatedReference.getProperties().putAll(reference.getProperties());
664 }
665
666
667 element.setReference(translatedReference);
668
669 }
670
671
672 for (MediaPackageElement element : removals) {
673 mp.remove(element);
674 }
675 return mp;
676 }
677
678
679
680
681
682
683 private boolean isStreamingFormat(MediaPackageElement element) {
684 return element instanceof TrackImpl
685 && STREAMING_FORMATS.contains(((TrackImpl) element).getTransport());
686 }
687
688
689
690
691
692
693 private void applyTags(MediaPackageElement element, String[] tags) {
694 for (String tag : tags) {
695 element.addTag(tag);
696 }
697 }
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714 protected MediaPackage mergePackages(MediaPackage updatedMp, MediaPackage publishedMp,
715 List<MediaPackageElementFlavor> mergeForceFlavors, List<MediaPackageElementFlavor> addForceFlavors) {
716 if (publishedMp == null) {
717 return updatedMp;
718 }
719
720 MediaPackage mergedMediaPackage = (MediaPackage) updatedMp.clone();
721 for (MediaPackageElement element : publishedMp.elements()) {
722 String type = element.getElementType().toString().toLowerCase();
723 boolean elementHasFlavorThatAlreadyExists = updatedMp.getElementsByFlavor(element.getFlavor()).length > 0;
724 boolean elementHasForceMergeFlavor = mergeForceFlavors.stream().anyMatch((f) -> element.getFlavor().matches(f));
725 boolean elementHasForceAddFlavor = addForceFlavors.stream().anyMatch((f) -> element.getFlavor().matches(f));
726
727 if (elementHasForceAddFlavor) {
728 logger.info("Adding {} '{}' into the updated mediapackage", type, element.getIdentifier());
729 mergedMediaPackage.add((MediaPackageElement) element.clone());
730 continue;
731 }
732 if (!elementHasFlavorThatAlreadyExists) {
733 if (elementHasForceMergeFlavor) {
734 logger.info("Forcing removal of {} {} due to the absence of a new element with flavor {}",
735 type, element.getIdentifier(), element.getFlavor().toString());
736 continue;
737 }
738 logger.info("Merging {} '{}' into the updated mediapackage", type, element.getIdentifier());
739 mergedMediaPackage.add((MediaPackageElement) element.clone());
740 } else {
741 logger.info("Overwriting existing {} '{}' with '{}' in the updated mediapackage",
742 type, element.getIdentifier(), updatedMp.getElementsByFlavor(element.getFlavor())[0].getIdentifier());
743
744 }
745 }
746
747 return mergedMediaPackage;
748 }
749
750 private void removePublicationElement(MediaPackage mediaPackage) {
751 for (Publication publicationElement : mediaPackage.getPublications()) {
752 if (CHANNEL_ID.equals(publicationElement.getChannel())) {
753 mediaPackage.remove(publicationElement);
754 }
755 }
756 }
757
758
759
760
761
762
763
764
765 private void retractFromEngage(MediaPackage distributedMediaPackage) throws WorkflowOperationException {
766 List<Job> jobs = new ArrayList<>();
767 Set<String> elementIds = new HashSet<>();
768 try {
769 if (distributedMediaPackage != null) {
770
771 for (MediaPackageElement element : distributedMediaPackage.getElements()) {
772 elementIds.add(element.getIdentifier());
773 }
774
775 if (elementIds.size() > 0) {
776 Job retractDownloadDistributionJob
777 = downloadDistributionService.retract(CHANNEL_ID, distributedMediaPackage, elementIds);
778 if (retractDownloadDistributionJob != null) {
779 jobs.add(retractDownloadDistributionJob);
780 }
781 }
782
783 if (streamingDistributionService != null && streamingDistributionService.publishToStreaming()) {
784 for (MediaPackageElement element : distributedMediaPackage.getElements()) {
785 Job retractStreamingJob
786 = streamingDistributionService.retract(CHANNEL_ID, distributedMediaPackage, element.getIdentifier());
787 if (retractStreamingJob != null) {
788 jobs.add(retractStreamingJob);
789 }
790 }
791 }
792
793 Job deleteSearchJob = null;
794 logger.info("Retracting already published Elements for Mediapackage: {}",
795 distributedMediaPackage.getIdentifier().toString());
796 deleteSearchJob = searchService.delete(distributedMediaPackage.getIdentifier().toString());
797 if (deleteSearchJob != null) {
798 jobs.add(deleteSearchJob);
799 }
800 }
801
802 if (!waitForStatus(jobs.toArray(new Job[jobs.size()])).isSuccess()) {
803 throw new WorkflowOperationException("One of the retraction jobs did not complete successfully");
804 }
805 } catch (DistributionException e) {
806 throw new WorkflowOperationException(e);
807 } catch (SearchException e) {
808 throw new WorkflowOperationException("Error retracting media package", e);
809 } catch (UnauthorizedException | NotFoundException ex) {
810 logger.error("Retraction failed of Mediapackage: {}", distributedMediaPackage.getIdentifier().toString(), ex);
811 }
812 }
813 }