View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.handler.composer;
23  
24  import org.opencastproject.composer.api.ComposerService;
25  import org.opencastproject.composer.api.EncoderException;
26  import org.opencastproject.composer.api.EncodingProfile;
27  import org.opencastproject.job.api.Job;
28  import org.opencastproject.job.api.JobContext;
29  import org.opencastproject.mediapackage.AdaptivePlaylist;
30  import org.opencastproject.mediapackage.MediaPackage;
31  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
32  import org.opencastproject.mediapackage.MediaPackageElementParser;
33  import org.opencastproject.mediapackage.MediaPackageException;
34  import org.opencastproject.mediapackage.Track;
35  import org.opencastproject.mediapackage.selector.AbstractMediaPackageElementSelector;
36  import org.opencastproject.mediapackage.selector.TrackSelector;
37  import org.opencastproject.serviceregistry.api.ServiceRegistry;
38  import org.opencastproject.util.NotFoundException;
39  import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
40  import org.opencastproject.workflow.api.WorkflowInstance;
41  import org.opencastproject.workflow.api.WorkflowOperationException;
42  import org.opencastproject.workflow.api.WorkflowOperationHandler;
43  import org.opencastproject.workflow.api.WorkflowOperationInstance;
44  import org.opencastproject.workflow.api.WorkflowOperationResult;
45  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
46  import org.opencastproject.workspace.api.Workspace;
47  
48  import org.apache.commons.io.FilenameUtils;
49  import org.apache.commons.lang3.BooleanUtils;
50  import org.apache.commons.lang3.StringUtils;
51  import org.osgi.service.component.ComponentContext;
52  import org.osgi.service.component.annotations.Activate;
53  import org.osgi.service.component.annotations.Component;
54  import org.osgi.service.component.annotations.Reference;
55  import org.slf4j.Logger;
56  import org.slf4j.LoggerFactory;
57  
58  import java.io.IOException;
59  import java.util.ArrayList;
60  import java.util.Collection;
61  import java.util.HashMap;
62  import java.util.List;
63  import java.util.Map;
64  import java.util.function.Predicate;
65  import java.util.stream.Collectors;
66  
67  /**
68   * The workflow definition for handling multiple concurrent outputs in one ffmpeg operation. This allows encoding and
69   * tagging to be done in one operation
70   */
71  @Component(
72      immediate = true,
73      service = WorkflowOperationHandler.class,
74      property = {
75          "service.description=MultiEncode Workflow Operation Handler",
76          "workflow.operation=multiencode"
77      }
78  )
79  public class MultiEncodeWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
80  
81    /** The logging facility */
82    private static final Logger logger = LoggerFactory.getLogger(MultiEncodeWorkflowOperationHandler.class);
83  
84    /** seperator for independent clauses */
85    static final String SEPARATOR = ";";
86  
87    /** The composer service */
88    private ComposerService composerService = null;
89  
90    /** The local workspace */
91    private Workspace workspace = null;
92  
93    @Activate
94    public void activate(ComponentContext cc) {
95      super.activate(cc);
96    }
97  
98    /**
99     * Callback for the OSGi declarative services configuration.
100    *
101    * @param composerService
102    *          the local composer service
103    */
104   @Reference
105   protected void setComposerService(ComposerService composerService) {
106     this.composerService = composerService;
107   }
108 
109   /**
110    * Callback for declarative services configuration that will introduce us to the local workspace service.
111    * Implementation assumes that the reference is configured as being static.
112    *
113    * @param workspace
114    *          an instance of the workspace
115    */
116   @Reference
117   public void setWorkspace(Workspace workspace) {
118     this.workspace = workspace;
119   }
120 
121   private Predicate<EncodingProfile> isManifestEP = p ->  p.getOutputType() == EncodingProfile.MediaType.Manifest;
122 
123   /**
124    * {@inheritDoc}
125    *
126    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#start(org.opencastproject.workflow.api.WorkflowInstance,
127    *      JobContext)
128    */
129   @Override
130   public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
131           throws WorkflowOperationException {
132     logger.debug("Running Multiencode workflow operation on workflow {}", workflowInstance.getId());
133 
134     try {
135       return multiencode(workflowInstance.getMediaPackage(), workflowInstance.getCurrentOperation());
136     } catch (Exception e) {
137       throw new WorkflowOperationException(e);
138     }
139   }
140 
141   protected class ElementProfileTagFlavor {
142     private AbstractMediaPackageElementSelector<Track> elementSelector = new TrackSelector();
143     private String targetFlavor = null;
144     private String targetTags = null;
145     private List<String> encodingProfiles = new ArrayList<>(); // redundant storage
146     private List<EncodingProfile> encodingProfileList = new ArrayList<>();
147 
148     ElementProfileTagFlavor(String profiles) {
149       List<String> profilelist = asList(profiles);
150       for (String profile : profilelist) {
151         EncodingProfile encodingprofile = composerService.getProfile(profile);
152         if (encodingprofile != null) {
153           encodingProfiles.add(encodingprofile.getIdentifier());
154           encodingProfileList.add(encodingprofile);
155         } else {
156           throw new IllegalArgumentException("Encoding profile " + profile + " not found.");
157         }
158       }
159     }
160 
161     public AbstractMediaPackageElementSelector<Track> getSelector() {
162       return this.elementSelector;
163     }
164 
165     public List<String> getProfiles() {
166       return this.encodingProfiles;
167     }
168 
169     public List<EncodingProfile> getEncodingProfiles() {
170       return this.encodingProfileList;
171     }
172 
173     void addSourceFlavor(String flavor) {
174       this.elementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
175     }
176 
177     void addSourceTag(String tag) {
178       this.elementSelector.addTag(tag);
179     }
180 
181     void setTargetTags(String tags) {
182       this.targetTags = tags;
183     }
184 
185     void setTargetFlavor(String flavor) {
186       this.targetFlavor = flavor;
187     }
188 
189     String getTargetFlavor() {
190       return this.targetFlavor;
191     }
192 
193     String getTargetTags() {
194       return this.targetTags;
195     }
196   }
197 
198   /*
199    * Figures out the logic of all the source tags, flavors and profiles and sorts out the source tracks and
200    * corresponding encoding profiles.
201    *
202    * Source Tracks are selected by (Flavor AND Tag) if they are both provided
203    *
204    * There can be multiple sources and flavors to create more than one source tracks. In the workflow, A semi-colon ";"
205    * is used to separate the independent operations.
206    *
207    * The independent operations can be either all share the same set of properties or all have different sets of
208    * properties. For example, There are two sets of source flavors: * "presenter/* ; presentation/*", one source tag,
209    * eg: "preview", and two sets of encoding profiles, eg: "mp4,flv ; mp4,hdtv" then there are two concurrent
210    * operations: the first one is all "presenter" tracks tagged "preview" will be encoded with "mp4" and "flv". The
211    * second one is all "presentation" tracks tagged "preview" encoded with "mp4" and "hdtv"
212    *
213    */
214   private List<ElementProfileTagFlavor> getSrcSelector(String[] sourceFlavors, String[] sourceTags,
215           String[] targetFlavors, String[] targetTags, String[] profiles) throws WorkflowOperationException {
216     int n = 0;
217     List<ElementProfileTagFlavor> elementSelectors = new ArrayList<>();
218     if (sourceTags == null && sourceFlavors == null)
219       throw new WorkflowOperationException("No source tags or Flavor");
220     if (profiles == null)
221       throw new WorkflowOperationException("Missing profiles");
222     if (sourceTags != null) { // If source tags are used to select tracks
223       // If use source and target tags, there should be the same number of them or all map into one target
224       if (targetTags != null && (targetTags.length != 1 && sourceTags.length != targetTags.length))
225         throw new WorkflowOperationException("number of source tags " + sourceTags.length
226                 + " does not match number of target tags " + targetTags.length + " (must be the same or one target)");
227       // There should be the same number of source tags or profile groups or all use same group of profiles
228       if (profiles.length != 1 && sourceTags.length != profiles.length) {
229         throw new WorkflowOperationException(
230                 "number of source tags segments " + sourceTags.length + " does not match number of profiles segments "
231                         + profiles.length + " (must be the same or one profile)");
232       }
233       // If use source tags and source flavors, there should be the same number of them or one
234       if (sourceFlavors != null && (sourceTags.length != 1 && sourceFlavors.length != 1)
235               && sourceFlavors.length != sourceTags.length) {
236         throw new WorkflowOperationException("number of source tags segments " + sourceTags.length
237                 + " does not match number of source Flavor segments " + sourceFlavors.length
238                 + " (must be the same or one)");
239       }
240       n = sourceTags.length; // at least this many tracks
241     }
242     if (sourceFlavors != null) { // If flavors are used to select tracks
243       // If use source and target flavors, there should be the same number of them or all map into one target
244       if (targetFlavors != null && (targetFlavors.length != 1 && sourceFlavors.length != targetFlavors.length)) {
245         throw new WorkflowOperationException(
246                 "number of source flavors " + sourceFlavors.length + " segment does not match number of target flavors"
247                         + targetFlavors.length + " (must be the same or one target flavor)");
248       }
249       // If use target tags, there should be the same number of source flavors and target tags or all map into one
250       // target tag
251       if (targetTags != null && targetTags.length != 1 && sourceFlavors.length != targetTags.length) {
252         throw new WorkflowOperationException(
253                 "number of source flavors " + sourceFlavors.length + " segment does not match number of target Tags"
254                         + targetTags.length + " (must be the same or one target)");
255       }
256       // Number of profile groups should match number of source flavors
257       if ((profiles.length != 1 && sourceFlavors.length != profiles.length)) {
258         throw new WorkflowOperationException("number of source flavors segments " + sourceFlavors.length
259                 + " does not match number of profiles segments " + profiles.length
260                 + " (must be the same or one profile)");
261       }
262       if (sourceFlavors.length > n)
263         n = sourceFlavors.length; // at least this many tracks
264     }
265     int numProfiles = 0;
266     // One for each source flavor
267     for (int i = 0; i < n; i++) {
268       elementSelectors.add(new ElementProfileTagFlavor(profiles[numProfiles]));
269       if (profiles.length > 1)
270         numProfiles++; // All source use the same set of profiles or its own
271     }
272     // If uses tags to select, but sets target flavor, they must match
273     if (sourceTags != null && sourceFlavors != null) {
274       if (sourceTags.length != sourceFlavors.length && sourceFlavors.length != 1 && sourceTags.length != 1) {
275         throw new WorkflowOperationException(
276                 "number of source flavors " + sourceTags.length + " does not match number of source tags "
277                         + sourceFlavors.length + " (must be the same or one set of tags or flavors)");
278       }
279     }
280     populateFlavorsAndTags(elementSelectors, sourceFlavors, targetFlavors, sourceTags, targetTags);
281     return elementSelectors;
282   }
283 
284   private List<ElementProfileTagFlavor> populateFlavorsAndTags(List<ElementProfileTagFlavor> elementSelectors,
285           String[] sourceFlavors, String[] targetFlavors, String[] sourceTags, String[] targetTags)
286           throws WorkflowOperationException {
287     int sf = 0;
288     int tf = 0;
289     int st = 0;
290     int tt = 0;
291     for (ElementProfileTagFlavor ep : elementSelectors) {
292       try {
293         if (sourceTags != null) {
294           for (String tag : asList(sourceTags[st])) {
295             ep.addSourceTag(tag);
296           }
297           if (sourceTags.length != 1)
298             st++;
299         }
300         if (targetTags != null) {
301           ep.setTargetTags(targetTags[tt]);
302           if (targetTags.length != 1)
303             tt++;
304         }
305         if (sourceFlavors != null) {
306           for (String flavor : asList(sourceFlavors[sf])) {
307             ep.addSourceFlavor(flavor);
308           }
309           if (sourceFlavors.length != 1)
310             sf++;
311         }
312         if (targetFlavors != null) {
313           for (String flavor : asList(targetFlavors[tf])) {
314             ep.setTargetFlavor(flavor);
315           }
316           if (targetFlavors.length != 1)
317             tf++;
318         }
319       } catch (IllegalArgumentException e) {
320         throw new WorkflowOperationException("Set Tags or Flavor " + e.getMessage());
321       }
322     }
323     return elementSelectors;
324   }
325 
326   private String[] getConfigAsArray(WorkflowOperationInstance operation, String name) {
327     String sourceOption = StringUtils.trimToNull(operation.getConfiguration(name));
328     return StringUtils.split(sourceOption, SEPARATOR);
329   }
330 
331   private List<Track> getManifest(Collection<Track> tracks) {
332     return tracks.stream().filter(AdaptivePlaylist.isHLSTrackPred).collect(Collectors.toList());
333   }
334 
335   /*
336    * Encode multiple tracks in a mediaPackage concurrently with different encoding profiles for each track. The encoding
337    * profiles are specified by names in a list and are the names used to tag each corresponding output. Each source
338    * track will start one operation on one worker. concurrency is achieved by running on different workers
339    *
340    * @param src The source media package
341    *
342    * @param operation the current workflow operation
343    *
344    * @return the operation result containing the updated media package
345    *
346    * @throws EncoderException if encoding fails
347    *
348    * @throws WorkflowOperationException if errors occur during processing
349    *
350    * @throws IOException if the workspace operations fail
351    *
352    * @throws NotFoundException if the workspace doesn't contain the requested file
353    */
354   private WorkflowOperationResult multiencode(MediaPackage src, WorkflowOperationInstance operation)
355           throws EncoderException, IOException, NotFoundException, MediaPackageException, WorkflowOperationException {
356     MediaPackage mediaPackage = (MediaPackage) src.clone();
357     // Check which tags have been configured
358     String[] sourceTags = getConfigAsArray(operation, "source-tags");
359     String[] sourceFlavors = getConfigAsArray(operation, "source-flavors");
360     String[] targetTags = getConfigAsArray(operation, "target-tags");
361     String[] targetFlavors = getConfigAsArray(operation, "target-flavors");
362     String tagWithProfileConfig = StringUtils.trimToNull(operation.getConfiguration("tag-with-profile"));
363     boolean tagWithProfile = BooleanUtils.toBoolean(tagWithProfileConfig);
364 
365     // Make sure either one of tags or flavors are provided
366     if (sourceFlavors == null && sourceTags == null) {
367       logger.info("No source tags or flavors have been specified, not matching anything");
368       return createResult(mediaPackage, Action.CONTINUE);
369     }
370     String[] profiles = getConfigAsArray(operation, "encoding-profiles");
371     if (profiles == null)
372       throw new WorkflowOperationException("Missing encoding profiles");
373 
374     // Sort out the combinatorics of all the tags and flavors
375     List<ElementProfileTagFlavor> selectors = getSrcSelector(sourceFlavors, sourceTags, targetFlavors, targetTags,
376             profiles);
377 
378     long totalTimeInQueue = 0;
379     Map<Job, JobInformation> encodingJobs = new HashMap<>();
380     // Find the encoding profiles - should only be one per flavor or tag
381     for (ElementProfileTagFlavor eptf : selectors) {
382       // Look for elements matching the tag and flavor
383       Collection<Track> elements = eptf.elementSelector.select(mediaPackage, true);
384       for (Track sourceTrack : elements) {
385         logger.info("Encoding track {} using encoding profile '{}'", sourceTrack, eptf.getProfiles().get(0).toString());
386         // Start encoding and wait for the result
387         encodingJobs.put(composerService.multiEncode(sourceTrack, eptf.getProfiles()),
388                 new JobInformation(sourceTrack, eptf, tagWithProfile));
389       }
390     }
391 
392     if (encodingJobs.isEmpty()) {
393       logger.info("No matching tracks found");
394       return createResult(mediaPackage, Action.CONTINUE);
395     }
396 
397     // Wait for the jobs to return
398     if (!waitForStatus(encodingJobs.keySet().toArray(new Job[encodingJobs.size()])).isSuccess()) {
399       throw new WorkflowOperationException("One of the encoding jobs did not complete successfully");
400     }
401 
402     // Process the result
403     for (Map.Entry<Job, JobInformation> entry : encodingJobs.entrySet()) {
404       Job job = entry.getKey();
405       Track sourceTrack = entry.getValue().getTrack(); // source
406       ElementProfileTagFlavor info = entry.getValue().getInfo(); // tags and flavors
407       List<EncodingProfile> eplist = entry.getValue().getProfileList();
408       // add this receipt's queue time to the total
409       totalTimeInQueue += job.getQueueTime();
410       // it is allowed for compose jobs to return an empty payload. See the EncodeEngine interface
411       if (job.getPayload().length() > 0) {
412         @SuppressWarnings("unchecked")
413         List<Track> composedTracks = (List<Track>) MediaPackageElementParser.getArrayFromXml(job.getPayload());
414         // HLS Manifest profile has precedence and overrides individual encoding profiles
415         boolean isHLS = eplist.stream().anyMatch(isManifestEP);
416         if (isHLS) { // check that manifests and segments counts are correct
417           decipherHLSPlaylistResults(sourceTrack, entry.getValue(), mediaPackage, composedTracks);
418         } else if (composedTracks.size() != info.getProfiles().size()) {
419           logger.info("Encoded {} tracks, with {} profiles", composedTracks.size(), info.getProfiles().size());
420           throw new WorkflowOperationException("Number of output tracks does not match number of encoding profiles");
421         }
422         for (Track composedTrack : composedTracks) {
423           if (info.getTargetFlavor() != null) { // Has Flavors
424             // set it to the matching flavor in the order listed
425             composedTrack.setFlavor(newFlavor(sourceTrack, info.getTargetFlavor()));
426             logger.debug("Composed track has flavor '{}'", composedTrack.getFlavor());
427           }
428           if (info.getTargetTags() != null) { // Has Tags
429             for (String tag : asList(info.getTargetTags())) {
430               logger.trace("Tagging composed track with '{}'", tag);
431               composedTrack.addTag(tag);
432             }
433           }
434           // Tag each output with encoding profile name if configured
435           if (entry.getValue().getTagWithProfile()) {
436             tagByProfile(composedTrack, eplist);
437           }
438           String fileName;
439           if (!isHLS || composedTrack.isMaster()) {
440             // name after source track if user facing
441             fileName = getFileNameFromElements(sourceTrack, composedTrack);
442           } else { // HLS-VOD
443             // Should all the files be renamed to the same as source
444             // which defeats the purpose of the suffix in encoding profiles
445             fileName = FilenameUtils.getName(composedTrack.getURI().getPath());
446           }
447           // store new tracks to mediaPackage
448           composedTrack.setURI(workspace.moveTo(composedTrack.getURI(), mediaPackage.getIdentifier().toString(),
449                   composedTrack.getIdentifier(), fileName));
450           mediaPackage.addDerived(composedTrack, sourceTrack);
451         }
452       } else {
453         logger.warn("No output from MultiEncode operation");
454       }
455     }
456     WorkflowOperationResult result = createResult(mediaPackage, Action.CONTINUE, totalTimeInQueue);
457     logger.debug("MultiEncode operation completed");
458     return result;
459   }
460 
461   /**
462    * Find the matching encoding profile for this track and tag by name
463    *
464    * @param track
465    * @param profiles
466    *          - profiles used to encode a track to multiple formats
467    * @return
468    */
469   private void tagByProfile(Track track, List<EncodingProfile> profiles) {
470     String rawfileName = track.getURI().getRawPath();
471     for (EncodingProfile ep : profiles) {
472       // #DCE
473       // Add any character at the beginning of the suffix so that it is properly
474       // converted in toSafeName (because the regex used there may treat the first
475       // character differently; the default one does now).
476       String suffixToSanitize = "X" + ep.getSuffix();
477       // !! workspace.putInCollection renames the file - need to do the same with suffix
478       String suffix = workspace.toSafeName(suffixToSanitize).substring(1);
479       if (suffix.length() > 0 && rawfileName.endsWith(suffix)) {
480         track.addTag(ep.getIdentifier());
481         return;
482       }
483     }
484   }
485 
486   private void decipherHLSPlaylistResults(Track track, JobInformation jobInfo, MediaPackage mediaPackage,
487           List<Track> composedTracks)
488           throws WorkflowOperationException, IllegalArgumentException, NotFoundException, IOException {
489     int nprofiles = jobInfo.getInfo().getProfiles().size();
490     List<Track> manifests = getManifest(composedTracks);
491 
492     if (manifests.size() != nprofiles) {
493       throw new WorkflowOperationException("Number of output playlists does not match number of encoding profiles");
494     }
495     if (composedTracks.size() != manifests.size() * 2 - 1) {
496       throw new WorkflowOperationException("Number of output media does not match number of encoding profiles");
497     }
498   }
499 
500   private MediaPackageElementFlavor newFlavor(Track track, String flavor) throws WorkflowOperationException {
501     if (StringUtils.isNotBlank(flavor)) {
502       try {
503         MediaPackageElementFlavor targetFlavor = MediaPackageElementFlavor.parseFlavor(flavor);
504         String flavorType = targetFlavor.getType();
505         String flavorSubtype = targetFlavor.getSubtype();
506         // Adjust the target flavor. Make sure to account for partial updates
507         if ("*".equals(flavorType))
508           flavorType = track.getFlavor().getType();
509         if ("*".equals(flavorSubtype))
510           flavorSubtype = track.getFlavor().getSubtype();
511         return (new MediaPackageElementFlavor(flavorType, flavorSubtype));
512       } catch (IllegalArgumentException e) {
513         throw new WorkflowOperationException("Target flavor '" + flavor + "' is malformed");
514       }
515     }
516     return null;
517   }
518 
519   /**
520    * This class is used to store context information for the jobs.
521    */
522   private static final class JobInformation {
523 
524     private Track track = null;
525     private ElementProfileTagFlavor info = null;
526     private boolean tagWithProfile;
527 
528     JobInformation(Track track, ElementProfileTagFlavor info, boolean tagWithProfile) {
529       this.track = track;
530       this.info = info;
531       this.tagWithProfile = tagWithProfile;
532     }
533 
534     public List<EncodingProfile> getProfileList() {
535       return info.encodingProfileList;
536     }
537 
538     /**
539      * Returns the track.
540      *
541      * @return the track
542      */
543     public Track getTrack() {
544       return track;
545     }
546 
547     public boolean getTagWithProfile() {
548       return this.tagWithProfile;
549     }
550 
551     public ElementProfileTagFlavor getInfo() {
552       return info;
553     }
554   }
555 
556   @Reference
557   @Override
558   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
559     super.setServiceRegistry(serviceRegistry);
560   }
561 
562 }