1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.handler.distribution;
23
24 import static org.apache.commons.lang3.StringUtils.isBlank;
25 import static org.opencastproject.systems.OpencastConstants.SERVER_URL_PROPERTY;
26 import static org.opencastproject.workflow.handler.distribution.EngagePublicationChannel.CHANNEL_ID;
27
28 import org.opencastproject.distribution.api.DistributionException;
29 import org.opencastproject.distribution.api.DownloadDistributionService;
30 import org.opencastproject.distribution.api.StreamingDistributionService;
31 import org.opencastproject.job.api.Job;
32 import org.opencastproject.job.api.JobContext;
33 import org.opencastproject.mediapackage.Attachment;
34 import org.opencastproject.mediapackage.Catalog;
35 import org.opencastproject.mediapackage.MediaPackage;
36 import org.opencastproject.mediapackage.MediaPackageElement;
37 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
38 import org.opencastproject.mediapackage.MediaPackageElementParser;
39 import org.opencastproject.mediapackage.MediaPackageElements;
40 import org.opencastproject.mediapackage.MediaPackageException;
41 import org.opencastproject.mediapackage.MediaPackageReference;
42 import org.opencastproject.mediapackage.MediaPackageReferenceImpl;
43 import org.opencastproject.mediapackage.Publication;
44 import org.opencastproject.mediapackage.PublicationImpl;
45 import org.opencastproject.mediapackage.Track;
46 import org.opencastproject.mediapackage.selector.SimpleElementSelector;
47 import org.opencastproject.mediapackage.track.TrackImpl;
48 import org.opencastproject.metadata.dublincore.DublinCore;
49 import org.opencastproject.metadata.dublincore.DublinCoreValue;
50 import org.opencastproject.metadata.dublincore.DublinCoreXmlFormat;
51 import org.opencastproject.search.api.SearchException;
52 import org.opencastproject.search.api.SearchService;
53 import org.opencastproject.security.api.Organization;
54 import org.opencastproject.security.api.OrganizationDirectoryService;
55 import org.opencastproject.security.api.UnauthorizedException;
56 import org.opencastproject.serviceregistry.api.ServiceRegistry;
57 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
58 import org.opencastproject.util.MimeTypes;
59 import org.opencastproject.util.NotFoundException;
60 import org.opencastproject.util.UrlSupport;
61 import org.opencastproject.util.data.functions.Strings;
62 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
63 import org.opencastproject.workflow.api.WorkflowInstance;
64 import org.opencastproject.workflow.api.WorkflowOperationException;
65 import org.opencastproject.workflow.api.WorkflowOperationHandler;
66 import org.opencastproject.workflow.api.WorkflowOperationInstance;
67 import org.opencastproject.workflow.api.WorkflowOperationResult;
68 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
69 import org.opencastproject.workspace.api.Workspace;
70
71 import org.apache.commons.lang3.StringUtils;
72 import org.apache.http.client.utils.URIUtils;
73 import org.osgi.framework.BundleContext;
74 import org.osgi.service.component.ComponentContext;
75 import org.osgi.service.component.annotations.Activate;
76 import org.osgi.service.component.annotations.Component;
77 import org.osgi.service.component.annotations.Reference;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
80
81 import java.net.MalformedURLException;
82 import java.net.URI;
83 import java.net.URL;
84 import java.util.ArrayList;
85 import java.util.Arrays;
86 import java.util.Collection;
87 import java.util.HashMap;
88 import java.util.HashSet;
89 import java.util.List;
90 import java.util.Map;
91 import java.util.Optional;
92 import java.util.Set;
93 import java.util.UUID;
94 import java.util.stream.Collectors;
95
96
97
98
99
100 @Component(
101 immediate = true,
102 service = WorkflowOperationHandler.class,
103 property = {
104 "service.description=Engage Publication Workflow Handler",
105 "workflow.operation=publish-engage"
106 }
107 )
108 public class PublishEngageWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
109
110
111 private static final Logger logger = LoggerFactory.getLogger(PublishEngageWorkflowOperationHandler.class);
112
113
114 static final String ENGAGE_URL_PROPERTY = "org.opencastproject.engage.ui.url";
115 static final String STREAMING_PUBLISH_PROPERTY = "org.opencastproject.publish.streaming.formats";
116
117
118 static final String DOWNLOAD_SOURCE_FLAVORS = "download-source-flavors";
119 static final String DOWNLOAD_TARGET_SUBFLAVOR = "download-target-subflavor";
120 static final String DOWNLOAD_SOURCE_TAGS = "download-source-tags";
121 static final String DOWNLOAD_TARGET_TAGS = "download-target-tags";
122 static final String STREAMING_SOURCE_TAGS = "streaming-source-tags";
123 static final String STREAMING_TARGET_TAGS = "streaming-target-tags";
124 static final String STREAMING_SOURCE_FLAVORS = "streaming-source-flavors";
125 static final String STREAMING_TARGET_SUBFLAVOR = "streaming-target-subflavor";
126 static final String CHECK_AVAILABILITY = "check-availability";
127 static final String STRATEGY = "strategy";
128 static final String MERGE_FORCE_FLAVORS = "merge-force-flavors";
129 static final String ADD_FORCE_FLAVORS = "add-force-flavors";
130
131 private static final String MERGE_FORCE_FLAVORS_DEFAULT = "dublincore/*,security/*";
132 private static final String ADD_FORCE_FLAVORS_DEFAULT = "";
133
134
135 static final String PLAYER_PATH = "/play/";
136
137
138 static final String PUBLISH_STRATEGY_MERGE = "merge";
139
140
141 static final String PUBLISH_STRATEGY_DEFAULT = "default";
142
143
144 private StreamingDistributionService streamingDistributionService = null;
145
146
147 private DownloadDistributionService downloadDistributionService = null;
148
149
150 private SearchService searchService = null;
151
152 private Workspace workspace;
153
154
155 private URL serverUrl;
156
157 private OrganizationDirectoryService organizationDirectoryService = null;
158
159
160 private List<String> publishedStreamingFormats = null;
161
162
163
164
165
166
167
168 @Reference(target = "(distribution.channel=streaming)")
169 protected void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
170 this.streamingDistributionService = streamingDistributionService;
171 }
172
173
174
175
176
177
178
179 @Reference(target = "(distribution.channel=download)")
180 protected void setDownloadDistributionService(DownloadDistributionService downloadDistributionService) {
181 this.downloadDistributionService = downloadDistributionService;
182 }
183
184
185
186
187
188
189
190
191 @Reference
192 protected void setSearchService(SearchService searchService) {
193 this.searchService = searchService;
194 }
195
196 @Reference
197 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
198 this.organizationDirectoryService = organizationDirectoryService;
199 }
200
201 @Reference
202 @Override
203 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
204 super.setServiceRegistry(serviceRegistry);
205 }
206
207 @Reference
208 public void setWorkspace(Workspace workspace) {
209 this.workspace = workspace;
210 }
211
212
213
214 private static final Set<TrackImpl.StreamingProtocol> STREAMING_FORMATS = new HashSet<>(Arrays.asList(
215 TrackImpl.StreamingProtocol.RTMP,
216 TrackImpl.StreamingProtocol.RTMPE,
217 TrackImpl.StreamingProtocol.HLS,
218 TrackImpl.StreamingProtocol.DASH,
219 TrackImpl.StreamingProtocol.HDS,
220 TrackImpl.StreamingProtocol.SMOOTH));
221
222 @Override
223 @Activate
224 protected void activate(ComponentContext cc) {
225 super.activate(cc);
226 BundleContext bundleContext = cc.getBundleContext();
227
228
229 serverUrl = UrlSupport.url(bundleContext.getProperty(SERVER_URL_PROPERTY));
230 publishedStreamingFormats = Arrays.asList(Optional.ofNullable(StringUtils.split(
231 bundleContext.getProperty(STREAMING_PUBLISH_PROPERTY), ",")).orElse(new String[0]));
232 }
233
234 @Override
235 public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
236 throws WorkflowOperationException {
237 logger.debug("Running engage publication workflow operation");
238
239 MediaPackage mediaPackage = workflowInstance.getMediaPackage();
240 WorkflowOperationInstance op = workflowInstance.getCurrentOperation();
241
242
243 String downloadSourceTags = StringUtils.trimToEmpty(op.getConfiguration(DOWNLOAD_SOURCE_TAGS));
244 String downloadTargetTags = StringUtils.trimToEmpty(op.getConfiguration(DOWNLOAD_TARGET_TAGS));
245 String downloadSourceFlavors = StringUtils.trimToEmpty(op.getConfiguration(DOWNLOAD_SOURCE_FLAVORS));
246 String downloadTargetSubflavor = StringUtils.trimToNull(op.getConfiguration(DOWNLOAD_TARGET_SUBFLAVOR));
247 String streamingSourceTags = StringUtils.trimToEmpty(op.getConfiguration(STREAMING_SOURCE_TAGS));
248 String streamingTargetTags = StringUtils.trimToEmpty(op.getConfiguration(STREAMING_TARGET_TAGS));
249 String streamingSourceFlavors = StringUtils.trimToEmpty(op.getConfiguration(STREAMING_SOURCE_FLAVORS));
250 String streamingTargetSubflavor = StringUtils.trimToNull(op.getConfiguration(STREAMING_TARGET_SUBFLAVOR));
251 String republishStrategy = StringUtils.trimToEmpty(
252 StringUtils.defaultString(op.getConfiguration(STRATEGY), PUBLISH_STRATEGY_DEFAULT));
253 String mergeForceFlavorsStr = StringUtils.trimToEmpty(
254 StringUtils.defaultString(op.getConfiguration(MERGE_FORCE_FLAVORS), MERGE_FORCE_FLAVORS_DEFAULT));
255 String addForceFlavorsStr = StringUtils.trimToEmpty(
256 StringUtils.defaultString(op.getConfiguration(ADD_FORCE_FLAVORS), ADD_FORCE_FLAVORS_DEFAULT));
257
258
259 boolean checkAvailability = Optional.ofNullable(op.getConfiguration(CHECK_AVAILABILITY))
260 .flatMap(Strings::trimToNone)
261 .map(Boolean::valueOf)
262 .orElse(true);
263
264
265
266 MediaPackage distributedMp = null;
267 try {
268 distributedMp = searchService.get(mediaPackage.getIdentifier().toString());
269 } catch (NotFoundException e) {
270 logger.debug("No published mediapackage found for {}", mediaPackage.getIdentifier().toString());
271 } catch (UnauthorizedException e) {
272 throw new WorkflowOperationException("Unauthorized for " + mediaPackage.getIdentifier().toString(), e);
273 }
274 if (PUBLISH_STRATEGY_MERGE.equals(republishStrategy) && distributedMp == null) {
275 logger.info("Skipping republish for {} since it is not currently published",
276 mediaPackage.getIdentifier().toString());
277 return createResult(mediaPackage, Action.SKIP);
278 }
279
280 String[] sourceDownloadTags = StringUtils.split(downloadSourceTags, ",");
281 String[] targetDownloadTags = StringUtils.split(downloadTargetTags, ",");
282 String[] sourceDownloadFlavors = StringUtils.split(downloadSourceFlavors, ",");
283 String[] sourceStreamingTags = StringUtils.split(streamingSourceTags, ",");
284 String[] targetStreamingTags = StringUtils.split(streamingTargetTags, ",");
285 String[] sourceStreamingFlavors = StringUtils.split(streamingSourceFlavors, ",");
286
287 if (sourceDownloadTags.length == 0 && sourceDownloadFlavors.length == 0 && sourceStreamingTags.length == 0
288 && sourceStreamingFlavors.length == 0) {
289 logger.warn("No tags or flavors have been specified, so nothing will be published to the "
290 + "engage publication channel");
291 return createResult(mediaPackage, Action.CONTINUE);
292 }
293
294
295 List<MediaPackageElementFlavor> mergeForceFlavors = Arrays.stream(StringUtils.split(mergeForceFlavorsStr, ", "))
296 .map(MediaPackageElementFlavor::parseFlavor).collect(Collectors.toList());
297 List<MediaPackageElementFlavor> addForceFlavors = Arrays.stream(StringUtils.split(addForceFlavorsStr, ", "))
298 .map(MediaPackageElementFlavor::parseFlavor).collect(Collectors.toList());
299
300
301 MediaPackageElementFlavor downloadSubflavor = null;
302 if (downloadTargetSubflavor != null) {
303 try {
304 downloadSubflavor = MediaPackageElementFlavor.parseFlavor(downloadTargetSubflavor);
305 } catch (IllegalArgumentException e) {
306 throw new WorkflowOperationException(e);
307 }
308 }
309
310
311 MediaPackageElementFlavor streamingSubflavor = null;
312 if (streamingTargetSubflavor != null) {
313 try {
314 streamingSubflavor = MediaPackageElementFlavor.parseFlavor(streamingTargetSubflavor);
315 } catch (IllegalArgumentException e) {
316 throw new WorkflowOperationException(e);
317 }
318 }
319
320
321 SimpleElementSelector downloadElementSelector = new SimpleElementSelector();
322 for (String flavor : sourceDownloadFlavors) {
323 downloadElementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
324 }
325 for (String tag : sourceDownloadTags) {
326 downloadElementSelector.addTag(tag);
327 }
328
329
330 SimpleElementSelector streamingElementSelector = new SimpleElementSelector();
331 for (String flavor : sourceStreamingFlavors) {
332 streamingElementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
333 }
334 for (String tag : sourceStreamingTags) {
335 streamingElementSelector.addTag(tag);
336 }
337
338
339 Collection<MediaPackageElement> downloadElements = downloadElementSelector.select(mediaPackage, false);
340 Collection<MediaPackageElement> streamingElements = streamingElementSelector.select(mediaPackage, false);
341
342 try {
343 Set<String> downloadElementIds = new HashSet<String>();
344 Set<String> streamingElementIds = new HashSet<String>();
345
346
347 for (MediaPackageElement elem : downloadElements) {
348 downloadElementIds.add(elem.getIdentifier());
349 }
350 for (MediaPackageElement elem : streamingElements) {
351 streamingElementIds.add(elem.getIdentifier());
352 }
353
354 removePublicationElement(mediaPackage);
355 if (republishStrategy.equals(PUBLISH_STRATEGY_DEFAULT)) {
356 retractFromEngage(distributedMp);
357 }
358
359 List<Job> jobs = new ArrayList<Job>();
360
361 try {
362 if (downloadElementIds.size() > 0) {
363 Job job = downloadDistributionService.distribute(
364 CHANNEL_ID, mediaPackage, downloadElementIds, checkAvailability);
365 if (job != null) {
366 jobs.add(job);
367 }
368 }
369
370 if (streamingDistributionService != null && streamingDistributionService.publishToStreaming()) {
371 for (String elementId : streamingElementIds) {
372 Job job = streamingDistributionService.distribute(CHANNEL_ID, mediaPackage, elementId);
373 if (job != null) {
374 jobs.add(job);
375 }
376 }
377 }
378 } catch (DistributionException e) {
379 throw new WorkflowOperationException(e);
380 }
381
382 if (jobs.size() < 1) {
383 logger.info("No mediapackage element was found for distribution to engage");
384 return createResult(mediaPackage, Action.CONTINUE);
385 }
386
387
388 if (!waitForStatus(jobs.toArray(new Job[jobs.size()])).isSuccess()) {
389 throw new WorkflowOperationException("One of the distribution jobs did not complete successfully");
390 }
391
392 logger.debug("Distribute of mediapackage {} completed", mediaPackage);
393
394 String engageUrlString = null;
395 try {
396 MediaPackage mediaPackageForSearch = getMediaPackageForSearchIndex(mediaPackage, jobs, downloadSubflavor,
397 targetDownloadTags, downloadElementIds, streamingSubflavor, streamingElementIds, targetStreamingTags);
398
399
400 removePublicationElement(mediaPackage);
401 if (republishStrategy.equals(PUBLISH_STRATEGY_MERGE)) {
402 mediaPackageForSearch = mergePackages(mediaPackageForSearch, distributedMp, mergeForceFlavors,
403 addForceFlavors);
404 }
405
406 if (StringUtils.isBlank(mediaPackageForSearch.getTitle())) {
407 var dcUri = Arrays.stream(mediaPackageForSearch.getCatalogs(MediaPackageElements.EPISODE))
408 .findFirst()
409 .map(MediaPackageElement::getURI);
410 if (dcUri.isPresent()) {
411 try (var in = workspace.read(dcUri.get())) {
412 DublinCoreXmlFormat.read(in)
413 .get(DublinCore.PROPERTY_TITLE)
414 .stream()
415 .findFirst()
416 .map(DublinCoreValue::getValue)
417 .ifPresent(mediaPackageForSearch::setTitle);
418 }
419 }
420 }
421
422
423 if (isBlank(mediaPackageForSearch.getTitle())) {
424 throw new WorkflowOperationException("Media package does not meet publication criteria: Missing title");
425 }
426 if (!mediaPackageForSearch.hasTracks()) {
427 throw new WorkflowOperationException("Media package does not meet publication criteria: No tracks selected");
428 }
429
430
431 MediaPackageElement[] mediaPackageElements = mediaPackageForSearch.getElements();
432
433 logger.info("Publishing media package {} to search index", mediaPackageForSearch);
434
435 URL engageBaseUrl;
436 Organization organization = organizationDirectoryService.getOrganization(workflowInstance.getOrganizationId());
437 engageUrlString = StringUtils.trimToNull(organization.getProperties().get(ENGAGE_URL_PROPERTY));
438 if (engageUrlString != null) {
439 engageBaseUrl = new URL(engageUrlString);
440 } else {
441 engageBaseUrl = serverUrl;
442 logger.info(
443 "Using 'server.url' as a fallback for the non-existing organization level key '{}' "
444 + "for the publication url",
445 ENGAGE_URL_PROPERTY);
446 }
447
448
449 URI engageUri = this.createEngageUri(engageBaseUrl.toURI(), mediaPackage);
450
451
452 Publication publicationElement = PublicationImpl.publication(UUID.randomUUID().toString(), CHANNEL_ID,
453 engageUri, MimeTypes.parseMimeType("text/html"));
454
455
456 for (MediaPackageElement element : mediaPackageElements) {
457 element.setIdentifier(null);
458 PublicationImpl.addElementToPublication(publicationElement, element);
459 }
460
461 mediaPackage.add(publicationElement);
462
463
464 if (streamingDistributionService != null
465 && streamingDistributionService.publishToStreaming()
466 && !publishedStreamingFormats.isEmpty()) {
467 for (Track track : mediaPackageForSearch.getTracks()) {
468 String mimeType = track.getMimeType().toString();
469 if (isStreamingFormat(track) && (publishedStreamingFormats.contains(mimeType)
470 || publishedStreamingFormats.contains("*"))) {
471 publicationElement.addTrack(track);
472 }
473 }
474 for (Attachment attachment : mediaPackageForSearch.getAttachments()) {
475 publicationElement.addAttachment(attachment);
476 }
477 for (Catalog catalog : mediaPackageForSearch.getCatalogs()) {
478 publicationElement.addCatalog(catalog);
479 }
480 }
481
482
483 Job publishJob = null;
484 try {
485 publishJob = searchService.add(mediaPackageForSearch);
486 if (!waitForStatus(publishJob).isSuccess()) {
487 throw new WorkflowOperationException("Mediapackage " + mediaPackageForSearch.getIdentifier()
488 + " could not be published");
489 }
490 } catch (SearchException e) {
491 throw new WorkflowOperationException("Error publishing media package", e);
492 } catch (MediaPackageException e) {
493 throw new WorkflowOperationException("Error parsing media package", e);
494 }
495
496 logger.debug("Publishing of mediapackage {} completed", mediaPackage);
497 return createResult(mediaPackage, Action.CONTINUE);
498 } catch (MalformedURLException e) {
499 logger.error("{} is malformed: {}", ENGAGE_URL_PROPERTY, engageUrlString);
500 throw new WorkflowOperationException(e);
501 } catch (Throwable t) {
502 if (t instanceof WorkflowOperationException) {
503 throw (WorkflowOperationException) t;
504 } else {
505 throw new WorkflowOperationException(t);
506 }
507 }
508 } catch (Exception e) {
509 if (e instanceof WorkflowOperationException) {
510 throw (WorkflowOperationException) e;
511 } else {
512 throw new WorkflowOperationException(e);
513 }
514 }
515 }
516
517
518
519
520
521
522
523
524 URI createEngageUri(URI engageUri, MediaPackage mp) {
525 return URIUtils.resolve(engageUri, PLAYER_PATH + mp.getIdentifier().toString());
526 }
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549 protected MediaPackage getMediaPackageForSearchIndex(MediaPackage current, List<Job> jobs,
550 MediaPackageElementFlavor downloadSubflavor, String[] downloadTargetTags, Set<String> downloadElementIds,
551 MediaPackageElementFlavor streamingSubflavor, Set<String> streamingElementIds, String[] streamingTargetTags)
552 throws MediaPackageException, NotFoundException, ServiceRegistryException, WorkflowOperationException {
553 MediaPackage mp = (MediaPackage) current.clone();
554
555
556 List<String> elementsToPublish = new ArrayList<String>();
557 Map<String, String> distributedElementIds = new HashMap<String, String>();
558
559 for (Job entry : jobs) {
560 Job job = serviceRegistry.getJob(entry.getId());
561
562
563 if (job.getPayload() == null) {
564 continue;
565 }
566
567 List<? extends MediaPackageElement> distributedElements = null;
568 try {
569 distributedElements = MediaPackageElementParser.getArrayFromXml(job.getPayload());
570 } catch (MediaPackageException e) {
571 throw new WorkflowOperationException(e);
572 }
573
574
575
576 if (distributedElements == null || distributedElements.size() < 1) {
577 continue;
578 }
579
580 for (MediaPackageElement distributedElement : distributedElements) {
581
582 String sourceElementId = distributedElement.getIdentifier();
583 if (sourceElementId != null) {
584 MediaPackageElement sourceElement = mp.getElementById(sourceElementId);
585
586
587 distributedElement.setIdentifier(null);
588 if (sourceElement != null) {
589
590 if (downloadElementIds.contains(sourceElementId)) {
591 if (downloadSubflavor != null) {
592 MediaPackageElementFlavor flavor = sourceElement.getFlavor();
593 if (flavor != null) {
594 MediaPackageElementFlavor newFlavor = new MediaPackageElementFlavor(flavor.getType(),
595 downloadSubflavor.getSubtype());
596 distributedElement.setFlavor(newFlavor);
597 }
598 }
599 }
600
601 else if (streamingElementIds.contains(sourceElementId)) {
602 if (streamingSubflavor != null && streamingElementIds.contains(sourceElementId)) {
603 MediaPackageElementFlavor flavor = sourceElement.getFlavor();
604 if (flavor != null) {
605 MediaPackageElementFlavor newFlavor = new MediaPackageElementFlavor(flavor.getType(),
606 streamingSubflavor.getSubtype());
607 distributedElement.setFlavor(newFlavor);
608 }
609 }
610 }
611
612 MediaPackageReference ref = sourceElement.getReference();
613 if (ref != null && mp.getElementByReference(ref) != null) {
614 MediaPackageReference newReference = (MediaPackageReference) ref.clone();
615 distributedElement.setReference(newReference);
616 }
617 }
618 }
619
620 if (isStreamingFormat(distributedElement)) {
621 applyTags(distributedElement, streamingTargetTags);
622 } else {
623 applyTags(distributedElement, downloadTargetTags);
624 }
625
626
627 mp.add(distributedElement);
628 elementsToPublish.add(distributedElement.getIdentifier());
629 distributedElementIds.put(sourceElementId, distributedElement.getIdentifier());
630 }
631 }
632
633
634 List<MediaPackageElement> removals = new ArrayList<MediaPackageElement>();
635 for (MediaPackageElement element : mp.getElements()) {
636 if (!elementsToPublish.contains(element.getIdentifier())) {
637 removals.add(element);
638 }
639 }
640
641
642 for (MediaPackageElement element : mp.getElements()) {
643
644 if (removals.contains(element)) {
645 continue;
646 }
647
648
649 MediaPackageReference reference = element.getReference();
650 if (reference == null) {
651 continue;
652 }
653
654
655 String distributedElementId = distributedElementIds.get(reference.getIdentifier());
656 if (distributedElementId == null) {
657 continue;
658 }
659
660 MediaPackageReference translatedReference
661 = new MediaPackageReferenceImpl(mp.getElementById(distributedElementId));
662 if (reference.getProperties() != null) {
663 translatedReference.getProperties().putAll(reference.getProperties());
664 }
665
666
667 element.setReference(translatedReference);
668
669 }
670
671
672 for (MediaPackageElement element : removals) {
673 mp.remove(element);
674 }
675 return mp;
676 }
677
678
679
680
681
682
683 private boolean isStreamingFormat(MediaPackageElement element) {
684 return element instanceof TrackImpl
685 && STREAMING_FORMATS.contains(((TrackImpl) element).getTransport());
686 }
687
688
689
690
691
692
693 private void applyTags(MediaPackageElement element, String[] tags) {
694 for (String tag : tags) {
695 element.addTag(tag);
696 }
697 }
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714 protected MediaPackage mergePackages(MediaPackage updatedMp, MediaPackage publishedMp,
715 List<MediaPackageElementFlavor> mergeForceFlavors, List<MediaPackageElementFlavor> addForceFlavors) {
716 if (publishedMp == null) {
717 return updatedMp;
718 }
719
720 MediaPackage mergedMediaPackage = (MediaPackage) updatedMp.clone();
721 for (MediaPackageElement element : publishedMp.elements()) {
722 String type = element.getElementType().toString().toLowerCase();
723 boolean elementHasFlavorThatAlreadyExists = updatedMp.getElementsByFlavor(element.getFlavor()).length > 0;
724 boolean elementHasForceMergeFlavor = mergeForceFlavors.stream().anyMatch((f) -> element.getFlavor().matches(f));
725 boolean elementHasForceAddFlavor = addForceFlavors.stream().anyMatch((f) -> element.getFlavor().matches(f));
726
727 if (elementHasForceAddFlavor) {
728 logger.info("Adding {} '{}' into the updated mediapackage", type, element.getIdentifier());
729 mergedMediaPackage.add((MediaPackageElement) element.clone());
730 continue;
731 }
732 if (!elementHasFlavorThatAlreadyExists) {
733 if (elementHasForceMergeFlavor) {
734 logger.info("Forcing removal of {} {} due to the absence of a new element with flavor {}",
735 type, element.getIdentifier(), element.getFlavor().toString());
736 continue;
737 }
738 logger.info("Merging {} '{}' into the updated mediapackage", type, element.getIdentifier());
739 mergedMediaPackage.add((MediaPackageElement) element.clone());
740 } else {
741 logger.info("Overwriting existing {} '{}' with '{}' in the updated mediapackage",
742 type, element.getIdentifier(), updatedMp.getElementsByFlavor(element.getFlavor())[0].getIdentifier());
743
744 }
745 }
746
747 return mergedMediaPackage;
748 }
749
750 private void removePublicationElement(MediaPackage mediaPackage) {
751 for (Publication publicationElement : mediaPackage.getPublications()) {
752 if (CHANNEL_ID.equals(publicationElement.getChannel())) {
753 mediaPackage.remove(publicationElement);
754 }
755 }
756 }
757
758
759
760
761
762
763
764
765 private void retractFromEngage(MediaPackage distributedMediaPackage) throws WorkflowOperationException {
766 List<Job> jobs = new ArrayList<>();
767 Set<String> elementIds = new HashSet<>();
768 try {
769 if (distributedMediaPackage != null) {
770
771 for (MediaPackageElement element : distributedMediaPackage.getElements()) {
772 elementIds.add(element.getIdentifier());
773 }
774
775 if (elementIds.size() > 0) {
776 Job retractDownloadDistributionJob
777 = downloadDistributionService.retract(CHANNEL_ID, distributedMediaPackage, elementIds);
778 if (retractDownloadDistributionJob != null) {
779 jobs.add(retractDownloadDistributionJob);
780 }
781 }
782
783 if (streamingDistributionService != null && streamingDistributionService.publishToStreaming()) {
784 for (MediaPackageElement element : distributedMediaPackage.getElements()) {
785 Job retractStreamingJob
786 = streamingDistributionService.retract(CHANNEL_ID, distributedMediaPackage, element.getIdentifier());
787 if (retractStreamingJob != null) {
788 jobs.add(retractStreamingJob);
789 }
790 }
791 }
792
793 Job deleteSearchJob = null;
794 logger.info("Retracting already published Elements for Mediapackage: {}",
795 distributedMediaPackage.getIdentifier().toString());
796 deleteSearchJob = searchService.delete(distributedMediaPackage.getIdentifier().toString());
797 if (deleteSearchJob != null) {
798 jobs.add(deleteSearchJob);
799 }
800 }
801
802 if (!waitForStatus(jobs.toArray(new Job[jobs.size()])).isSuccess()) {
803 throw new WorkflowOperationException("One of the retraction jobs did not complete successfully");
804 }
805 } catch (DistributionException e) {
806 throw new WorkflowOperationException(e);
807 } catch (SearchException e) {
808 throw new WorkflowOperationException("Error retracting media package", e);
809 } catch (UnauthorizedException | NotFoundException ex) {
810 logger.error("Retraction failed of Mediapackage: {}", distributedMediaPackage.getIdentifier().toString(), ex);
811 }
812 }
813 }