View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.handler.distribution;
23  
24  import static org.apache.commons.lang3.StringUtils.isBlank;
25  import static org.opencastproject.systems.OpencastConstants.SERVER_URL_PROPERTY;
26  import static org.opencastproject.workflow.handler.distribution.EngagePublicationChannel.CHANNEL_ID;
27  
28  import org.opencastproject.distribution.api.DistributionException;
29  import org.opencastproject.distribution.api.DownloadDistributionService;
30  import org.opencastproject.distribution.api.StreamingDistributionService;
31  import org.opencastproject.job.api.Job;
32  import org.opencastproject.job.api.JobContext;
33  import org.opencastproject.mediapackage.Attachment;
34  import org.opencastproject.mediapackage.Catalog;
35  import org.opencastproject.mediapackage.MediaPackage;
36  import org.opencastproject.mediapackage.MediaPackageElement;
37  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
38  import org.opencastproject.mediapackage.MediaPackageElementParser;
39  import org.opencastproject.mediapackage.MediaPackageElements;
40  import org.opencastproject.mediapackage.MediaPackageException;
41  import org.opencastproject.mediapackage.MediaPackageReference;
42  import org.opencastproject.mediapackage.MediaPackageReferenceImpl;
43  import org.opencastproject.mediapackage.Publication;
44  import org.opencastproject.mediapackage.PublicationImpl;
45  import org.opencastproject.mediapackage.Track;
46  import org.opencastproject.mediapackage.selector.SimpleElementSelector;
47  import org.opencastproject.mediapackage.track.TrackImpl;
48  import org.opencastproject.metadata.dublincore.DublinCore;
49  import org.opencastproject.metadata.dublincore.DublinCoreValue;
50  import org.opencastproject.metadata.dublincore.DublinCoreXmlFormat;
51  import org.opencastproject.search.api.SearchException;
52  import org.opencastproject.search.api.SearchService;
53  import org.opencastproject.security.api.Organization;
54  import org.opencastproject.security.api.OrganizationDirectoryService;
55  import org.opencastproject.security.api.UnauthorizedException;
56  import org.opencastproject.serviceregistry.api.ServiceRegistry;
57  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
58  import org.opencastproject.util.MimeTypes;
59  import org.opencastproject.util.NotFoundException;
60  import org.opencastproject.util.UrlSupport;
61  import org.opencastproject.util.data.functions.Strings;
62  import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
63  import org.opencastproject.workflow.api.WorkflowInstance;
64  import org.opencastproject.workflow.api.WorkflowOperationException;
65  import org.opencastproject.workflow.api.WorkflowOperationHandler;
66  import org.opencastproject.workflow.api.WorkflowOperationInstance;
67  import org.opencastproject.workflow.api.WorkflowOperationResult;
68  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
69  import org.opencastproject.workspace.api.Workspace;
70  
71  import org.apache.commons.lang3.StringUtils;
72  import org.apache.http.client.utils.URIUtils;
73  import org.osgi.framework.BundleContext;
74  import org.osgi.service.component.ComponentContext;
75  import org.osgi.service.component.annotations.Activate;
76  import org.osgi.service.component.annotations.Component;
77  import org.osgi.service.component.annotations.Reference;
78  import org.slf4j.Logger;
79  import org.slf4j.LoggerFactory;
80  
81  import java.net.MalformedURLException;
82  import java.net.URI;
83  import java.net.URL;
84  import java.util.ArrayList;
85  import java.util.Arrays;
86  import java.util.Collection;
87  import java.util.HashMap;
88  import java.util.HashSet;
89  import java.util.List;
90  import java.util.Map;
91  import java.util.Optional;
92  import java.util.Set;
93  import java.util.UUID;
94  import java.util.stream.Collectors;
95  
96  /**
97   * The workflow definition for handling "engage publication" operations
98   */
99  
100 @Component(
101     immediate = true,
102     service = WorkflowOperationHandler.class,
103     property = {
104         "service.description=Engage Publication Workflow Handler",
105         "workflow.operation=publish-engage"
106     }
107 )
108 public class PublishEngageWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
109 
110   /** The logging facility */
111   private static final Logger logger = LoggerFactory.getLogger(PublishEngageWorkflowOperationHandler.class);
112 
113   /** Configuration properties id */
114   static final String ENGAGE_URL_PROPERTY = "org.opencastproject.engage.ui.url";
115   static final String STREAMING_PUBLISH_PROPERTY = "org.opencastproject.publish.streaming.formats";
116 
117   /** Workflow configuration option keys */
118   static final String DOWNLOAD_SOURCE_FLAVORS = "download-source-flavors";
119   static final String DOWNLOAD_TARGET_SUBFLAVOR = "download-target-subflavor";
120   static final String DOWNLOAD_SOURCE_TAGS = "download-source-tags";
121   static final String DOWNLOAD_TARGET_TAGS = "download-target-tags";
122   static final String STREAMING_SOURCE_TAGS = "streaming-source-tags";
123   static final String STREAMING_TARGET_TAGS = "streaming-target-tags";
124   static final String STREAMING_SOURCE_FLAVORS = "streaming-source-flavors";
125   static final String STREAMING_TARGET_SUBFLAVOR = "streaming-target-subflavor";
126   static final String CHECK_AVAILABILITY = "check-availability";
127   static final String STRATEGY = "strategy";
128   static final String MERGE_FORCE_FLAVORS = "merge-force-flavors";
129   static final String ADD_FORCE_FLAVORS = "add-force-flavors";
130 
131   private static final String MERGE_FORCE_FLAVORS_DEFAULT = "dublincore/*,security/*";
132   private static final String ADD_FORCE_FLAVORS_DEFAULT = "";
133 
134   /** Path the REST endpoint which will re-direct users to the currently configured video player **/
135   static final String PLAYER_PATH = "/play/";
136 
137   /** Name constant for the 'merge' strategy **/
138   static final String PUBLISH_STRATEGY_MERGE = "merge";
139 
140   /** Name constant for the 'default' 'strategy **/
141   static final String PUBLISH_STRATEGY_DEFAULT = "default";
142 
143   /** The streaming distribution service */
144   private StreamingDistributionService streamingDistributionService = null;
145 
146   /** The download distribution service */
147   private DownloadDistributionService downloadDistributionService = null;
148 
149   /** The search service */
150   private SearchService searchService = null;
151 
152   private Workspace workspace;
153 
154   /** The server url */
155   private URL serverUrl;
156 
157   private OrganizationDirectoryService organizationDirectoryService = null;
158 
159   /** Which streaming formats should be published automatically */
160   private List<String> publishedStreamingFormats = null;
161 
162   /**
163    * Callback for the OSGi declarative services configuration.
164    *
165    * @param streamingDistributionService
166    *          the streaming distribution service
167    */
168   @Reference(target = "(distribution.channel=streaming)")
169   protected void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
170     this.streamingDistributionService = streamingDistributionService;
171   }
172 
173   /**
174    * Callback for the OSGi declarative services configuration.
175    *
176    * @param downloadDistributionService
177    *          the download distribution service
178    */
179   @Reference(target = "(distribution.channel=download)")
180   protected void setDownloadDistributionService(DownloadDistributionService downloadDistributionService) {
181     this.downloadDistributionService = downloadDistributionService;
182   }
183 
184   /**
185    * Callback for declarative services configuration that will introduce us to the search service. Implementation
186    * assumes that the reference is configured as being static.
187    *
188    * @param searchService
189    *          an instance of the search service
190    */
191   @Reference
192   protected void setSearchService(SearchService searchService) {
193     this.searchService = searchService;
194   }
195 
196   @Reference
197   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
198     this.organizationDirectoryService = organizationDirectoryService;
199   }
200 
201   @Reference
202   @Override
203   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
204     super.setServiceRegistry(serviceRegistry);
205   }
206 
207   @Reference
208   public void setWorkspace(Workspace workspace) {
209     this.workspace = workspace;
210   }
211 
212 
213   /** Supported streaming formats */
214   private static final Set<TrackImpl.StreamingProtocol> STREAMING_FORMATS = new HashSet<>(Arrays.asList(
215           TrackImpl.StreamingProtocol.RTMP,
216           TrackImpl.StreamingProtocol.RTMPE,
217           TrackImpl.StreamingProtocol.HLS,
218           TrackImpl.StreamingProtocol.DASH,
219           TrackImpl.StreamingProtocol.HDS,
220           TrackImpl.StreamingProtocol.SMOOTH));
221 
222   @Override
223   @Activate
224   protected void activate(ComponentContext cc) {
225     super.activate(cc);
226     BundleContext bundleContext = cc.getBundleContext();
227 
228     // Get configuration
229     serverUrl = UrlSupport.url(bundleContext.getProperty(SERVER_URL_PROPERTY));
230     publishedStreamingFormats = Arrays.asList(Optional.ofNullable(StringUtils.split(
231             bundleContext.getProperty(STREAMING_PUBLISH_PROPERTY), ",")).orElse(new String[0]));
232   }
233 
234   @Override
235   public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
236           throws WorkflowOperationException {
237     logger.debug("Running engage publication workflow operation");
238 
239     MediaPackage mediaPackage = workflowInstance.getMediaPackage();
240     WorkflowOperationInstance op = workflowInstance.getCurrentOperation();
241 
242     // Check which tags have been configured
243     String downloadSourceTags = StringUtils.trimToEmpty(op.getConfiguration(DOWNLOAD_SOURCE_TAGS));
244     String downloadTargetTags = StringUtils.trimToEmpty(op.getConfiguration(DOWNLOAD_TARGET_TAGS));
245     String downloadSourceFlavors = StringUtils.trimToEmpty(op.getConfiguration(DOWNLOAD_SOURCE_FLAVORS));
246     String downloadTargetSubflavor = StringUtils.trimToNull(op.getConfiguration(DOWNLOAD_TARGET_SUBFLAVOR));
247     String streamingSourceTags = StringUtils.trimToEmpty(op.getConfiguration(STREAMING_SOURCE_TAGS));
248     String streamingTargetTags = StringUtils.trimToEmpty(op.getConfiguration(STREAMING_TARGET_TAGS));
249     String streamingSourceFlavors = StringUtils.trimToEmpty(op.getConfiguration(STREAMING_SOURCE_FLAVORS));
250     String streamingTargetSubflavor = StringUtils.trimToNull(op.getConfiguration(STREAMING_TARGET_SUBFLAVOR));
251     String republishStrategy = StringUtils.trimToEmpty(
252             StringUtils.defaultString(op.getConfiguration(STRATEGY), PUBLISH_STRATEGY_DEFAULT));
253     String mergeForceFlavorsStr = StringUtils.trimToEmpty(
254             StringUtils.defaultString(op.getConfiguration(MERGE_FORCE_FLAVORS), MERGE_FORCE_FLAVORS_DEFAULT));
255     String addForceFlavorsStr = StringUtils.trimToEmpty(
256             StringUtils.defaultString(op.getConfiguration(ADD_FORCE_FLAVORS), ADD_FORCE_FLAVORS_DEFAULT));
257 
258 
259     boolean checkAvailability = Optional.ofNullable(op.getConfiguration(CHECK_AVAILABILITY))
260             .flatMap(Strings::trimToNone)
261             .map(Boolean::valueOf)
262             .orElse(true);
263 
264     // First check if mp exists in the search index and strategy is merge
265     // to avoid leaving distributed elements around.
266     MediaPackage distributedMp = null;
267     try {
268       distributedMp = searchService.get(mediaPackage.getIdentifier().toString());
269     } catch (NotFoundException e) {
270       logger.debug("No published mediapackage found for {}", mediaPackage.getIdentifier().toString());
271     } catch (UnauthorizedException e) {
272       throw new WorkflowOperationException("Unauthorized for " + mediaPackage.getIdentifier().toString(), e);
273     }
274     if (PUBLISH_STRATEGY_MERGE.equals(republishStrategy) && distributedMp == null) {
275       logger.info("Skipping republish for {} since it is not currently published",
276               mediaPackage.getIdentifier().toString());
277       return createResult(mediaPackage, Action.SKIP);
278     }
279 
280     String[] sourceDownloadTags = StringUtils.split(downloadSourceTags, ",");
281     String[] targetDownloadTags = StringUtils.split(downloadTargetTags, ",");
282     String[] sourceDownloadFlavors = StringUtils.split(downloadSourceFlavors, ",");
283     String[] sourceStreamingTags = StringUtils.split(streamingSourceTags, ",");
284     String[] targetStreamingTags = StringUtils.split(streamingTargetTags, ",");
285     String[] sourceStreamingFlavors = StringUtils.split(streamingSourceFlavors, ",");
286 
287     if (sourceDownloadTags.length == 0 && sourceDownloadFlavors.length == 0 && sourceStreamingTags.length == 0
288             && sourceStreamingFlavors.length == 0) {
289       logger.warn("No tags or flavors have been specified, so nothing will be published to the "
290           + "engage publication channel");
291       return createResult(mediaPackage, Action.CONTINUE);
292     }
293 
294     // Parse forced flavors
295     List<MediaPackageElementFlavor> mergeForceFlavors = Arrays.stream(StringUtils.split(mergeForceFlavorsStr, ", "))
296             .map(MediaPackageElementFlavor::parseFlavor).collect(Collectors.toList());
297     List<MediaPackageElementFlavor> addForceFlavors = Arrays.stream(StringUtils.split(addForceFlavorsStr, ", "))
298         .map(MediaPackageElementFlavor::parseFlavor).collect(Collectors.toList());
299 
300     // Parse the download target flavor
301     MediaPackageElementFlavor downloadSubflavor = null;
302     if (downloadTargetSubflavor != null) {
303       try {
304         downloadSubflavor = MediaPackageElementFlavor.parseFlavor(downloadTargetSubflavor);
305       } catch (IllegalArgumentException e) {
306         throw new WorkflowOperationException(e);
307       }
308     }
309 
310     // Parse the streaming target flavor
311     MediaPackageElementFlavor streamingSubflavor = null;
312     if (streamingTargetSubflavor != null) {
313       try {
314         streamingSubflavor = MediaPackageElementFlavor.parseFlavor(streamingTargetSubflavor);
315       } catch (IllegalArgumentException e) {
316         throw new WorkflowOperationException(e);
317       }
318     }
319 
320     // Configure the download element selector
321     SimpleElementSelector downloadElementSelector = new SimpleElementSelector();
322     for (String flavor : sourceDownloadFlavors) {
323       downloadElementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
324     }
325     for (String tag : sourceDownloadTags) {
326       downloadElementSelector.addTag(tag);
327     }
328 
329     // Configure the streaming element selector
330     SimpleElementSelector streamingElementSelector = new SimpleElementSelector();
331     for (String flavor : sourceStreamingFlavors) {
332       streamingElementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
333     }
334     for (String tag : sourceStreamingTags) {
335       streamingElementSelector.addTag(tag);
336     }
337 
338     // Select the appropriate elements for download and streaming
339     Collection<MediaPackageElement> downloadElements = downloadElementSelector.select(mediaPackage, false);
340     Collection<MediaPackageElement> streamingElements = streamingElementSelector.select(mediaPackage, false);
341 
342     try {
343       Set<String> downloadElementIds = new HashSet<String>();
344       Set<String> streamingElementIds = new HashSet<String>();
345 
346       // Look for elements matching the tag
347       for (MediaPackageElement elem : downloadElements) {
348         downloadElementIds.add(elem.getIdentifier());
349       }
350       for (MediaPackageElement elem : streamingElements) {
351         streamingElementIds.add(elem.getIdentifier());
352       }
353 
354       removePublicationElement(mediaPackage);
355       if (republishStrategy.equals(PUBLISH_STRATEGY_DEFAULT)) {
356         retractFromEngage(distributedMp);
357       }
358 
359       List<Job> jobs = new ArrayList<Job>();
360       //distribute Elements
361       try {
362         if (downloadElementIds.size() > 0) {
363           Job job = downloadDistributionService.distribute(
364               CHANNEL_ID, mediaPackage, downloadElementIds, checkAvailability);
365           if (job != null) {
366             jobs.add(job);
367           }
368         }
369 
370         if (streamingDistributionService != null && streamingDistributionService.publishToStreaming()) {
371           for (String elementId : streamingElementIds) {
372             Job job = streamingDistributionService.distribute(CHANNEL_ID, mediaPackage, elementId);
373             if (job != null) {
374               jobs.add(job);
375             }
376           }
377         }
378       } catch (DistributionException e) {
379         throw new WorkflowOperationException(e);
380       }
381 
382       if (jobs.size() < 1) {
383         logger.info("No mediapackage element was found for distribution to engage");
384         return createResult(mediaPackage, Action.CONTINUE);
385       }
386 
387       // Wait until all distribution jobs have returned
388       if (!waitForStatus(jobs.toArray(new Job[jobs.size()])).isSuccess()) {
389         throw new WorkflowOperationException("One of the distribution jobs did not complete successfully");
390       }
391 
392       logger.debug("Distribute of mediapackage {} completed", mediaPackage);
393 
394       String engageUrlString = null;
395       try {
396         MediaPackage mediaPackageForSearch = getMediaPackageForSearchIndex(mediaPackage, jobs, downloadSubflavor,
397                 targetDownloadTags, downloadElementIds, streamingSubflavor, streamingElementIds, targetStreamingTags);
398 
399         // MH-10216, check if only merging into existing mediapackage
400         removePublicationElement(mediaPackage);
401         if (republishStrategy.equals(PUBLISH_STRATEGY_MERGE)) {
402           mediaPackageForSearch = mergePackages(mediaPackageForSearch, distributedMp, mergeForceFlavors,
403               addForceFlavors);
404         }
405 
406         if (StringUtils.isBlank(mediaPackageForSearch.getTitle())) {
407           var dcUri = Arrays.stream(mediaPackageForSearch.getCatalogs(MediaPackageElements.EPISODE))
408               .findFirst()
409               .map(MediaPackageElement::getURI);
410           if (dcUri.isPresent()) {
411             try (var in = workspace.read(dcUri.get())) {
412               DublinCoreXmlFormat.read(in)
413                   .get(DublinCore.PROPERTY_TITLE)
414                   .stream()
415                   .findFirst()
416                   .map(DublinCoreValue::getValue)
417                   .ifPresent(mediaPackageForSearch::setTitle);
418             }
419           }
420         }
421 
422         // Check that the media package meets the criteria for publication
423         if (isBlank(mediaPackageForSearch.getTitle())) {
424           throw new WorkflowOperationException("Media package does not meet publication criteria: Missing title");
425         }
426         if (!mediaPackageForSearch.hasTracks()) {
427           throw new WorkflowOperationException("Media package does not meet publication criteria: No tracks selected");
428         }
429 
430         // Prepare published elements to be added
431         MediaPackageElement[] mediaPackageElements = mediaPackageForSearch.getElements();
432 
433         logger.info("Publishing media package {} to search index", mediaPackageForSearch);
434 
435         URL engageBaseUrl;
436         Organization organization = organizationDirectoryService.getOrganization(workflowInstance.getOrganizationId());
437         engageUrlString = StringUtils.trimToNull(organization.getProperties().get(ENGAGE_URL_PROPERTY));
438         if (engageUrlString != null) {
439           engageBaseUrl = new URL(engageUrlString);
440         } else {
441           engageBaseUrl = serverUrl;
442           logger.info(
443               "Using 'server.url' as a fallback for the non-existing organization level key '{}' "
444                   + "for the publication url",
445               ENGAGE_URL_PROPERTY);
446         }
447 
448         // create the publication URI (used by Admin UI for event details link)
449         URI engageUri = this.createEngageUri(engageBaseUrl.toURI(), mediaPackage);
450 
451         // Create new distribution element
452         Publication publicationElement = PublicationImpl.publication(UUID.randomUUID().toString(), CHANNEL_ID,
453                 engageUri, MimeTypes.parseMimeType("text/html"));
454 
455         // Add published elements
456         for (MediaPackageElement element : mediaPackageElements) {
457           element.setIdentifier(null);
458           PublicationImpl.addElementToPublication(publicationElement, element);
459         }
460 
461         mediaPackage.add(publicationElement);
462 
463         // create publication URI for streaming
464         if (streamingDistributionService != null
465             && streamingDistributionService.publishToStreaming()
466             && !publishedStreamingFormats.isEmpty()) {
467           for (Track track : mediaPackageForSearch.getTracks()) {
468             String mimeType = track.getMimeType().toString();
469             if (isStreamingFormat(track) && (publishedStreamingFormats.contains(mimeType)
470                     || publishedStreamingFormats.contains("*"))) {
471               publicationElement.addTrack(track);
472             }
473           }
474           for (Attachment attachment : mediaPackageForSearch.getAttachments()) {
475             publicationElement.addAttachment(attachment);
476           }
477           for (Catalog catalog : mediaPackageForSearch.getCatalogs()) {
478             publicationElement.addCatalog(catalog);
479           }
480         }
481 
482         // Adding media package to the search index
483         Job publishJob = null;
484         try {
485           publishJob = searchService.add(mediaPackageForSearch);
486           if (!waitForStatus(publishJob).isSuccess()) {
487             throw new WorkflowOperationException("Mediapackage " + mediaPackageForSearch.getIdentifier()
488                     + " could not be published");
489           }
490         } catch (SearchException e) {
491           throw new WorkflowOperationException("Error publishing media package", e);
492         } catch (MediaPackageException e) {
493           throw new WorkflowOperationException("Error parsing media package", e);
494         }
495 
496         logger.debug("Publishing of mediapackage {} completed", mediaPackage);
497         return createResult(mediaPackage, Action.CONTINUE);
498       } catch (MalformedURLException e) {
499         logger.error("{} is malformed: {}", ENGAGE_URL_PROPERTY, engageUrlString);
500         throw new WorkflowOperationException(e);
501       } catch (Throwable t) {
502         if (t instanceof WorkflowOperationException) {
503           throw (WorkflowOperationException) t;
504         } else {
505           throw new WorkflowOperationException(t);
506         }
507       }
508     } catch (Exception e) {
509       if (e instanceof WorkflowOperationException) {
510         throw (WorkflowOperationException) e;
511       } else {
512         throw new WorkflowOperationException(e);
513       }
514     }
515   }
516 
517   /**
518    * Local utility to assemble player path for this class
519    *
520    * @param engageUri
521    * @param mp
522    * @return the assembled player URI for this mediapackage
523    */
524   URI createEngageUri(URI engageUri, MediaPackage mp) {
525     return URIUtils.resolve(engageUri, PLAYER_PATH + mp.getIdentifier().toString());
526   }
527 
528   /**
529    * Returns a mediapackage that only contains elements that are marked for distribution.
530    *
531    * @param current
532    *          the current mediapackage
533    * @param jobs
534    *          the distribution jobs
535    * @param downloadSubflavor
536    *          flavor to be applied to elements distributed to download
537    * @param downloadTargetTags
538    *          tags to be applied to elements distributed to downloads
539    * @param downloadElementIds
540    *          identifiers for elements that have been distributed to downloads
541    * @param streamingSubflavor
542    *          flavor to be applied to elements distributed to streaming
543    * @param streamingElementIds
544    *          identifiers for elements that have been distributed to streaming
545    * @param streamingTargetTags
546    *          tags to be applied to elements distributed to streaming
547    * @return the new mediapackage
548    */
549   protected MediaPackage getMediaPackageForSearchIndex(MediaPackage current, List<Job> jobs,
550           MediaPackageElementFlavor downloadSubflavor, String[] downloadTargetTags, Set<String> downloadElementIds,
551           MediaPackageElementFlavor streamingSubflavor, Set<String> streamingElementIds, String[] streamingTargetTags)
552           throws MediaPackageException, NotFoundException, ServiceRegistryException, WorkflowOperationException {
553     MediaPackage mp = (MediaPackage) current.clone();
554 
555     // All the jobs have passed, let's update the mediapackage with references to the distributed elements
556     List<String> elementsToPublish = new ArrayList<String>();
557     Map<String, String> distributedElementIds = new HashMap<String, String>();
558 
559     for (Job entry : jobs) {
560       Job job = serviceRegistry.getJob(entry.getId());
561 
562       // If there is no payload, then the item has not been distributed.
563       if (job.getPayload() == null) {
564         continue;
565       }
566 
567       List<? extends MediaPackageElement> distributedElements = null;
568       try {
569         distributedElements = MediaPackageElementParser.getArrayFromXml(job.getPayload());
570       } catch (MediaPackageException e) {
571         throw new WorkflowOperationException(e);
572       }
573 
574       // If the job finished successfully, but returned no new element, the channel simply doesn't support this
575       // kind of element. So we just keep on looping.
576       if (distributedElements == null || distributedElements.size() < 1) {
577         continue;
578       }
579 
580       for (MediaPackageElement distributedElement : distributedElements) {
581 
582         String sourceElementId = distributedElement.getIdentifier();
583         if (sourceElementId != null) {
584           MediaPackageElement sourceElement = mp.getElementById(sourceElementId);
585 
586           // Make sure the mediapackage is prompted to create a new identifier for this element
587           distributedElement.setIdentifier(null);
588           if (sourceElement != null) {
589             // Adjust the flavor and tags for downloadable elements
590             if (downloadElementIds.contains(sourceElementId)) {
591               if (downloadSubflavor != null) {
592                 MediaPackageElementFlavor flavor = sourceElement.getFlavor();
593                 if (flavor != null) {
594                   MediaPackageElementFlavor newFlavor = new MediaPackageElementFlavor(flavor.getType(),
595                           downloadSubflavor.getSubtype());
596                   distributedElement.setFlavor(newFlavor);
597                 }
598               }
599             }
600             // Adjust the flavor and tags for streaming elements
601             else if (streamingElementIds.contains(sourceElementId)) {
602               if (streamingSubflavor != null && streamingElementIds.contains(sourceElementId)) {
603                 MediaPackageElementFlavor flavor = sourceElement.getFlavor();
604                 if (flavor != null) {
605                   MediaPackageElementFlavor newFlavor = new MediaPackageElementFlavor(flavor.getType(),
606                           streamingSubflavor.getSubtype());
607                   distributedElement.setFlavor(newFlavor);
608                 }
609               }
610             }
611             // Copy references from the source elements to the distributed elements
612             MediaPackageReference ref = sourceElement.getReference();
613             if (ref != null && mp.getElementByReference(ref) != null) {
614               MediaPackageReference newReference = (MediaPackageReference) ref.clone();
615               distributedElement.setReference(newReference);
616             }
617           }
618         }
619 
620         if (isStreamingFormat(distributedElement)) {
621           applyTags(distributedElement, streamingTargetTags);
622         } else {
623           applyTags(distributedElement, downloadTargetTags);
624         }
625 
626         // Add the new element to the mediapackage
627         mp.add(distributedElement);
628         elementsToPublish.add(distributedElement.getIdentifier());
629         distributedElementIds.put(sourceElementId, distributedElement.getIdentifier());
630       }
631     }
632 
633     // Mark everything that is set for removal
634     List<MediaPackageElement> removals = new ArrayList<MediaPackageElement>();
635     for (MediaPackageElement element : mp.getElements()) {
636       if (!elementsToPublish.contains(element.getIdentifier())) {
637         removals.add(element);
638       }
639     }
640 
641     // Translate references to the distributed artifacts
642     for (MediaPackageElement element : mp.getElements()) {
643 
644       if (removals.contains(element)) {
645         continue;
646       }
647 
648       // Is the element referencing anything?
649       MediaPackageReference reference = element.getReference();
650       if (reference == null) {
651         continue;
652       }
653 
654       // See if the element has been distributed
655       String distributedElementId = distributedElementIds.get(reference.getIdentifier());
656       if (distributedElementId == null) {
657         continue;
658       }
659 
660       MediaPackageReference translatedReference
661           = new MediaPackageReferenceImpl(mp.getElementById(distributedElementId));
662       if (reference.getProperties() != null) {
663         translatedReference.getProperties().putAll(reference.getProperties());
664       }
665 
666       // Set the new reference
667       element.setReference(translatedReference);
668 
669     }
670 
671     // Remove everything we don't want to add to publish
672     for (MediaPackageElement element : removals) {
673       mp.remove(element);
674     }
675     return mp;
676   }
677 
678   /**
679    * Checks if the MediaPackage track transport protocol is a streaming format protocol
680    * @param element The MediaPackageElement to analyze
681    * @return true if it is a TrackImpl and has a streaming protocol as transport
682    */
683   private boolean isStreamingFormat(MediaPackageElement element) {
684     return element instanceof TrackImpl
685             && STREAMING_FORMATS.contains(((TrackImpl) element).getTransport());
686   }
687 
688   /**
689    * Adds Tags to a MediaPackageElement
690    * @param element the element that needs the tags
691    * @param tags the list of tags to apply
692    */
693   private void applyTags(MediaPackageElement element, String[] tags) {
694     for (String tag : tags) {
695       element.addTag(tag);
696     }
697   }
698 
699   /**
700    * MH-10216, Copied from the original RepublishWorkflowOperationHandler
701    *
702    * Merges the updated mediapackage with the one that is currently published in a way where the updated elements
703    * replace existing ones in the published mediapackage based on their flavor.
704    * <p>
705    * If <code>publishedMp</code> is <code>null</code>, this method returns the updated mediapackage without any
706    * modifications.
707    *
708    * @param updatedMp
709    *          the updated media package
710    * @param publishedMp
711    *          the mediapackage that is currently published
712    * @return the merged mediapackage
713    */
714   protected MediaPackage mergePackages(MediaPackage updatedMp, MediaPackage publishedMp,
715           List<MediaPackageElementFlavor> mergeForceFlavors, List<MediaPackageElementFlavor> addForceFlavors) {
716     if (publishedMp == null) {
717       return updatedMp;
718     }
719 
720     MediaPackage mergedMediaPackage = (MediaPackage) updatedMp.clone();
721     for (MediaPackageElement element : publishedMp.elements()) {
722       String type = element.getElementType().toString().toLowerCase();
723       boolean elementHasFlavorThatAlreadyExists = updatedMp.getElementsByFlavor(element.getFlavor()).length > 0;
724       boolean elementHasForceMergeFlavor = mergeForceFlavors.stream().anyMatch((f) -> element.getFlavor().matches(f));
725       boolean elementHasForceAddFlavor = addForceFlavors.stream().anyMatch((f) -> element.getFlavor().matches(f));
726 
727       if (elementHasForceAddFlavor) {
728         logger.info("Adding {} '{}' into the updated mediapackage", type, element.getIdentifier());
729         mergedMediaPackage.add((MediaPackageElement) element.clone());
730         continue;
731       }
732       if (!elementHasFlavorThatAlreadyExists) {
733         if (elementHasForceMergeFlavor) {
734           logger.info("Forcing removal of {} {} due to the absence of a new element with flavor {}",
735                   type, element.getIdentifier(), element.getFlavor().toString());
736           continue;
737         }
738         logger.info("Merging {} '{}' into the updated mediapackage", type, element.getIdentifier());
739         mergedMediaPackage.add((MediaPackageElement) element.clone());
740       } else {
741         logger.info("Overwriting existing {} '{}' with '{}' in the updated mediapackage",
742                 type, element.getIdentifier(), updatedMp.getElementsByFlavor(element.getFlavor())[0].getIdentifier());
743 
744       }
745     }
746 
747     return mergedMediaPackage;
748   }
749 
750   private void removePublicationElement(MediaPackage mediaPackage) {
751     for (Publication publicationElement : mediaPackage.getPublications()) {
752       if (CHANNEL_ID.equals(publicationElement.getChannel())) {
753         mediaPackage.remove(publicationElement);
754       }
755     }
756   }
757 
758   /**
759    * Removes Mediapackage from Searchindex
760    *
761    * @param distributedMediaPackage
762    *          The media package gotten from the search index
763    * @throws WorkflowOperationException
764    */
765   private void retractFromEngage(MediaPackage distributedMediaPackage) throws WorkflowOperationException {
766     List<Job> jobs = new ArrayList<>();
767     Set<String> elementIds = new HashSet<>();
768     try {
769       if (distributedMediaPackage != null) {
770 
771         for (MediaPackageElement element : distributedMediaPackage.getElements()) {
772           elementIds.add(element.getIdentifier());
773         }
774         //bulk retraction
775         if (elementIds.size() > 0) {
776           Job  retractDownloadDistributionJob
777               = downloadDistributionService.retract(CHANNEL_ID, distributedMediaPackage, elementIds);
778           if (retractDownloadDistributionJob != null) {
779             jobs.add(retractDownloadDistributionJob);
780           }
781         }
782 
783         if (streamingDistributionService != null && streamingDistributionService.publishToStreaming()) {
784           for (MediaPackageElement element : distributedMediaPackage.getElements()) {
785             Job retractStreamingJob
786                 = streamingDistributionService.retract(CHANNEL_ID, distributedMediaPackage, element.getIdentifier());
787             if (retractStreamingJob != null) {
788               jobs.add(retractStreamingJob);
789             }
790           }
791         }
792 
793         Job deleteSearchJob = null;
794         logger.info("Retracting already published Elements for Mediapackage: {}",
795                 distributedMediaPackage.getIdentifier().toString());
796         deleteSearchJob = searchService.delete(distributedMediaPackage.getIdentifier().toString());
797         if (deleteSearchJob != null) {
798           jobs.add(deleteSearchJob);
799         }
800       }
801       // Wait until all retraction jobs have returned
802       if (!waitForStatus(jobs.toArray(new Job[jobs.size()])).isSuccess()) {
803         throw new WorkflowOperationException("One of the retraction jobs did not complete successfully");
804       }
805     } catch (DistributionException e) {
806       throw new WorkflowOperationException(e);
807     } catch (SearchException e) {
808       throw new WorkflowOperationException("Error retracting media package", e);
809     } catch (UnauthorizedException | NotFoundException ex) {
810       logger.error("Retraction failed of Mediapackage: {}", distributedMediaPackage.getIdentifier().toString(), ex);
811     }
812   }
813 }