View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.handler.composer;
23  
24  import org.opencastproject.composer.api.ComposerService;
25  import org.opencastproject.composer.api.EncoderException;
26  import org.opencastproject.composer.api.EncodingProfile;
27  import org.opencastproject.job.api.Job;
28  import org.opencastproject.job.api.JobContext;
29  import org.opencastproject.mediapackage.AdaptivePlaylist;
30  import org.opencastproject.mediapackage.MediaPackage;
31  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
32  import org.opencastproject.mediapackage.MediaPackageElementParser;
33  import org.opencastproject.mediapackage.MediaPackageException;
34  import org.opencastproject.mediapackage.Track;
35  import org.opencastproject.mediapackage.selector.AbstractMediaPackageElementSelector;
36  import org.opencastproject.mediapackage.selector.TrackSelector;
37  import org.opencastproject.serviceregistry.api.ServiceRegistry;
38  import org.opencastproject.util.NotFoundException;
39  import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
40  import org.opencastproject.workflow.api.WorkflowInstance;
41  import org.opencastproject.workflow.api.WorkflowOperationException;
42  import org.opencastproject.workflow.api.WorkflowOperationHandler;
43  import org.opencastproject.workflow.api.WorkflowOperationInstance;
44  import org.opencastproject.workflow.api.WorkflowOperationResult;
45  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
46  import org.opencastproject.workspace.api.Workspace;
47  
48  import org.apache.commons.io.FilenameUtils;
49  import org.apache.commons.lang3.BooleanUtils;
50  import org.apache.commons.lang3.StringUtils;
51  import org.osgi.service.component.ComponentContext;
52  import org.osgi.service.component.annotations.Activate;
53  import org.osgi.service.component.annotations.Component;
54  import org.osgi.service.component.annotations.Reference;
55  import org.slf4j.Logger;
56  import org.slf4j.LoggerFactory;
57  
58  import java.io.IOException;
59  import java.util.ArrayList;
60  import java.util.Collection;
61  import java.util.HashMap;
62  import java.util.List;
63  import java.util.Map;
64  import java.util.function.Predicate;
65  import java.util.stream.Collectors;
66  
67  /**
68   * The workflow definition for handling multiple concurrent outputs in one ffmpeg operation. This allows encoding and
69   * tagging to be done in one operation
70   */
71  @Component(
72      immediate = true,
73      service = WorkflowOperationHandler.class,
74      property = {
75          "service.description=MultiEncode Workflow Operation Handler",
76          "workflow.operation=multiencode"
77      }
78  )
79  public class MultiEncodeWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
80  
81    /** The logging facility */
82    private static final Logger logger = LoggerFactory.getLogger(MultiEncodeWorkflowOperationHandler.class);
83  
84    /** seperator for independent clauses */
85    static final String SEPARATOR = ";";
86  
87    /** The composer service */
88    private ComposerService composerService = null;
89  
90    /** The local workspace */
91    private Workspace workspace = null;
92  
93    @Activate
94    public void activate(ComponentContext cc) {
95      super.activate(cc);
96    }
97  
98    /**
99     * Callback for the OSGi declarative services configuration.
100    *
101    * @param composerService
102    *          the local composer service
103    */
104   @Reference
105   protected void setComposerService(ComposerService composerService) {
106     this.composerService = composerService;
107   }
108 
109   /**
110    * Callback for declarative services configuration that will introduce us to the local workspace service.
111    * Implementation assumes that the reference is configured as being static.
112    *
113    * @param workspace
114    *          an instance of the workspace
115    */
116   @Reference
117   public void setWorkspace(Workspace workspace) {
118     this.workspace = workspace;
119   }
120 
121   private Predicate<EncodingProfile> isManifestEP = p ->  p.getOutputType() == EncodingProfile.MediaType.Manifest;
122 
123   /**
124    * {@inheritDoc}
125    *
126    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#start(
127    *      org.opencastproject.workflow.api.WorkflowInstance, JobContext)
128    */
129   @Override
130   public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
131           throws WorkflowOperationException {
132     logger.debug("Running Multiencode workflow operation on workflow {}", workflowInstance.getId());
133 
134     try {
135       return multiencode(workflowInstance.getMediaPackage(), workflowInstance.getCurrentOperation());
136     } catch (Exception e) {
137       throw new WorkflowOperationException(e);
138     }
139   }
140 
141   protected class ElementProfileTagFlavor {
142     private AbstractMediaPackageElementSelector<Track> elementSelector = new TrackSelector();
143     private String targetFlavor = null;
144     private String targetTags = null;
145     private List<String> encodingProfiles = new ArrayList<>(); // redundant storage
146     private List<EncodingProfile> encodingProfileList = new ArrayList<>();
147 
148     ElementProfileTagFlavor(String profiles) {
149       List<String> profilelist = asList(profiles);
150       for (String profile : profilelist) {
151         EncodingProfile encodingprofile = composerService.getProfile(profile);
152         if (encodingprofile != null) {
153           encodingProfiles.add(encodingprofile.getIdentifier());
154           encodingProfileList.add(encodingprofile);
155         } else {
156           throw new IllegalArgumentException("Encoding profile " + profile + " not found.");
157         }
158       }
159     }
160 
161     public AbstractMediaPackageElementSelector<Track> getSelector() {
162       return this.elementSelector;
163     }
164 
165     public List<String> getProfiles() {
166       return this.encodingProfiles;
167     }
168 
169     public List<EncodingProfile> getEncodingProfiles() {
170       return this.encodingProfileList;
171     }
172 
173     void addSourceFlavor(String flavor) {
174       this.elementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
175     }
176 
177     void addSourceTag(String tag) {
178       this.elementSelector.addTag(tag);
179     }
180 
181     void setTargetTags(String tags) {
182       this.targetTags = tags;
183     }
184 
185     void setTargetFlavor(String flavor) {
186       this.targetFlavor = flavor;
187     }
188 
189     String getTargetFlavor() {
190       return this.targetFlavor;
191     }
192 
193     String getTargetTags() {
194       return this.targetTags;
195     }
196   }
197 
198   /*
199    * Figures out the logic of all the source tags, flavors and profiles and sorts out the source tracks and
200    * corresponding encoding profiles.
201    *
202    * Source Tracks are selected by (Flavor AND Tag) if they are both provided
203    *
204    * There can be multiple sources and flavors to create more than one source tracks. In the workflow, A semi-colon ";"
205    * is used to separate the independent operations.
206    *
207    * The independent operations can be either all share the same set of properties or all have different sets of
208    * properties. For example, There are two sets of source flavors: * "presenter/* ; presentation/*", one source tag,
209    * eg: "preview", and two sets of encoding profiles, eg: "mp4,flv ; mp4,hdtv" then there are two concurrent
210    * operations: the first one is all "presenter" tracks tagged "preview" will be encoded with "mp4" and "flv". The
211    * second one is all "presentation" tracks tagged "preview" encoded with "mp4" and "hdtv"
212    *
213    */
214   private List<ElementProfileTagFlavor> getSrcSelector(String[] sourceFlavors, String[] sourceTags,
215           String[] targetFlavors, String[] targetTags, String[] profiles) throws WorkflowOperationException {
216     int n = 0;
217     List<ElementProfileTagFlavor> elementSelectors = new ArrayList<>();
218     if (sourceTags == null && sourceFlavors == null) {
219       throw new WorkflowOperationException("No source tags or Flavor");
220     }
221     if (profiles == null) {
222       throw new WorkflowOperationException("Missing profiles");
223     }
224     if (sourceTags != null) { // If source tags are used to select tracks
225       // If use source and target tags, there should be the same number of them or all map into one target
226       if (targetTags != null && (targetTags.length != 1 && sourceTags.length != targetTags.length)) {
227         throw new WorkflowOperationException(
228             "number of source tags " + sourceTags.length + " does not match number of target tags " + targetTags.length
229                 + " (must be the same or one target)");
230       }
231       // There should be the same number of source tags or profile groups or all use same group of profiles
232       if (profiles.length != 1 && sourceTags.length != profiles.length) {
233         throw new WorkflowOperationException(
234                 "number of source tags segments " + sourceTags.length + " does not match number of profiles segments "
235                         + profiles.length + " (must be the same or one profile)");
236       }
237       // If use source tags and source flavors, there should be the same number of them or one
238       if (sourceFlavors != null && (sourceTags.length != 1 && sourceFlavors.length != 1)
239               && sourceFlavors.length != sourceTags.length) {
240         throw new WorkflowOperationException("number of source tags segments " + sourceTags.length
241                 + " does not match number of source Flavor segments " + sourceFlavors.length
242                 + " (must be the same or one)");
243       }
244       n = sourceTags.length; // at least this many tracks
245     }
246     if (sourceFlavors != null) { // If flavors are used to select tracks
247       // If use source and target flavors, there should be the same number of them or all map into one target
248       if (targetFlavors != null && (targetFlavors.length != 1 && sourceFlavors.length != targetFlavors.length)) {
249         throw new WorkflowOperationException(
250                 "number of source flavors " + sourceFlavors.length + " segment does not match number of target flavors"
251                         + targetFlavors.length + " (must be the same or one target flavor)");
252       }
253       // If use target tags, there should be the same number of source flavors and target tags or all map into one
254       // target tag
255       if (targetTags != null && targetTags.length != 1 && sourceFlavors.length != targetTags.length) {
256         throw new WorkflowOperationException(
257                 "number of source flavors " + sourceFlavors.length + " segment does not match number of target Tags"
258                         + targetTags.length + " (must be the same or one target)");
259       }
260       // Number of profile groups should match number of source flavors
261       if ((profiles.length != 1 && sourceFlavors.length != profiles.length)) {
262         throw new WorkflowOperationException("number of source flavors segments " + sourceFlavors.length
263                 + " does not match number of profiles segments " + profiles.length
264                 + " (must be the same or one profile)");
265       }
266       if (sourceFlavors.length > n) {
267         n = sourceFlavors.length; // at least this many tracks
268       }
269     }
270     int numProfiles = 0;
271     // One for each source flavor
272     for (int i = 0; i < n; i++) {
273       elementSelectors.add(new ElementProfileTagFlavor(profiles[numProfiles]));
274       if (profiles.length > 1) {
275         numProfiles++; // All source use the same set of profiles or its own
276       }
277     }
278     // If uses tags to select, but sets target flavor, they must match
279     if (sourceTags != null && sourceFlavors != null) {
280       if (sourceTags.length != sourceFlavors.length && sourceFlavors.length != 1 && sourceTags.length != 1) {
281         throw new WorkflowOperationException(
282                 "number of source flavors " + sourceTags.length + " does not match number of source tags "
283                         + sourceFlavors.length + " (must be the same or one set of tags or flavors)");
284       }
285     }
286     populateFlavorsAndTags(elementSelectors, sourceFlavors, targetFlavors, sourceTags, targetTags);
287     return elementSelectors;
288   }
289 
290   private List<ElementProfileTagFlavor> populateFlavorsAndTags(List<ElementProfileTagFlavor> elementSelectors,
291           String[] sourceFlavors, String[] targetFlavors, String[] sourceTags, String[] targetTags)
292           throws WorkflowOperationException {
293     int sf = 0;
294     int tf = 0;
295     int st = 0;
296     int tt = 0;
297     for (ElementProfileTagFlavor ep : elementSelectors) {
298       try {
299         if (sourceTags != null) {
300           for (String tag : asList(sourceTags[st])) {
301             ep.addSourceTag(tag);
302           }
303           if (sourceTags.length != 1) {
304             st++;
305           }
306         }
307         if (targetTags != null) {
308           ep.setTargetTags(targetTags[tt]);
309           if (targetTags.length != 1) {
310             tt++;
311           }
312         }
313         if (sourceFlavors != null) {
314           for (String flavor : asList(sourceFlavors[sf])) {
315             ep.addSourceFlavor(flavor);
316           }
317           if (sourceFlavors.length != 1) {
318             sf++;
319           }
320         }
321         if (targetFlavors != null) {
322           for (String flavor : asList(targetFlavors[tf])) {
323             ep.setTargetFlavor(flavor);
324           }
325           if (targetFlavors.length != 1) {
326             tf++;
327           }
328         }
329       } catch (IllegalArgumentException e) {
330         throw new WorkflowOperationException("Set Tags or Flavor " + e.getMessage());
331       }
332     }
333     return elementSelectors;
334   }
335 
336   private String[] getConfigAsArray(WorkflowOperationInstance operation, String name) {
337     String sourceOption = StringUtils.trimToNull(operation.getConfiguration(name));
338     return StringUtils.split(sourceOption, SEPARATOR);
339   }
340 
341   private List<Track> getManifest(Collection<Track> tracks) {
342     return tracks.stream().filter(AdaptivePlaylist.isHLSTrackPred).collect(Collectors.toList());
343   }
344 
345   /*
346    * Encode multiple tracks in a mediaPackage concurrently with different encoding profiles for each track. The encoding
347    * profiles are specified by names in a list and are the names used to tag each corresponding output. Each source
348    * track will start one operation on one worker. concurrency is achieved by running on different workers
349    *
350    * @param src The source media package
351    *
352    * @param operation the current workflow operation
353    *
354    * @return the operation result containing the updated media package
355    *
356    * @throws EncoderException if encoding fails
357    *
358    * @throws WorkflowOperationException if errors occur during processing
359    *
360    * @throws IOException if the workspace operations fail
361    *
362    * @throws NotFoundException if the workspace doesn't contain the requested file
363    */
364   private WorkflowOperationResult multiencode(MediaPackage src, WorkflowOperationInstance operation)
365           throws EncoderException, IOException, NotFoundException, MediaPackageException, WorkflowOperationException {
366     MediaPackage mediaPackage = (MediaPackage) src.clone();
367     // Check which tags have been configured
368     String[] sourceTags = getConfigAsArray(operation, "source-tags");
369     String[] sourceFlavors = getConfigAsArray(operation, "source-flavors");
370     String[] targetTags = getConfigAsArray(operation, "target-tags");
371     String[] targetFlavors = getConfigAsArray(operation, "target-flavors");
372     String tagWithProfileConfig = StringUtils.trimToNull(operation.getConfiguration("tag-with-profile"));
373     boolean tagWithProfile = BooleanUtils.toBoolean(tagWithProfileConfig);
374 
375     // Make sure either one of tags or flavors are provided
376     if (sourceFlavors == null && sourceTags == null) {
377       logger.info("No source tags or flavors have been specified, not matching anything");
378       return createResult(mediaPackage, Action.CONTINUE);
379     }
380     String[] profiles = getConfigAsArray(operation, "encoding-profiles");
381     if (profiles == null) {
382       throw new WorkflowOperationException("Missing encoding profiles");
383     }
384 
385     // Sort out the combinatorics of all the tags and flavors
386     List<ElementProfileTagFlavor> selectors = getSrcSelector(sourceFlavors, sourceTags, targetFlavors, targetTags,
387             profiles);
388 
389     long totalTimeInQueue = 0;
390     Map<Job, JobInformation> encodingJobs = new HashMap<>();
391     // Find the encoding profiles - should only be one per flavor or tag
392     for (ElementProfileTagFlavor eptf : selectors) {
393       // Look for elements matching the tag and flavor
394       Collection<Track> elements = eptf.elementSelector.select(mediaPackage, true);
395       for (Track sourceTrack : elements) {
396         logger.info("Encoding track {} using encoding profile '{}'", sourceTrack, eptf.getProfiles().get(0).toString());
397         // Start encoding and wait for the result
398         encodingJobs.put(composerService.multiEncode(sourceTrack, eptf.getProfiles()),
399                 new JobInformation(sourceTrack, eptf, tagWithProfile));
400       }
401     }
402 
403     if (encodingJobs.isEmpty()) {
404       logger.info("No matching tracks found");
405       return createResult(mediaPackage, Action.CONTINUE);
406     }
407 
408     // Wait for the jobs to return
409     if (!waitForStatus(encodingJobs.keySet().toArray(new Job[encodingJobs.size()])).isSuccess()) {
410       throw new WorkflowOperationException("One of the encoding jobs did not complete successfully");
411     }
412 
413     // Process the result
414     for (Map.Entry<Job, JobInformation> entry : encodingJobs.entrySet()) {
415       Job job = entry.getKey();
416       Track sourceTrack = entry.getValue().getTrack(); // source
417       ElementProfileTagFlavor info = entry.getValue().getInfo(); // tags and flavors
418       List<EncodingProfile> eplist = entry.getValue().getProfileList();
419       // add this receipt's queue time to the total
420       totalTimeInQueue += job.getQueueTime();
421       // it is allowed for compose jobs to return an empty payload. See the EncodeEngine interface
422       if (job.getPayload().length() > 0) {
423         @SuppressWarnings("unchecked")
424         List<Track> composedTracks = (List<Track>) MediaPackageElementParser.getArrayFromXml(job.getPayload());
425         // HLS Manifest profile has precedence and overrides individual encoding profiles
426         boolean isHLS = eplist.stream().anyMatch(isManifestEP);
427         if (isHLS) { // check that manifests and segments counts are correct
428           decipherHLSPlaylistResults(sourceTrack, entry.getValue(), mediaPackage, composedTracks);
429         } else if (composedTracks.size() != info.getProfiles().size()) {
430           logger.info("Encoded {} tracks, with {} profiles", composedTracks.size(), info.getProfiles().size());
431           throw new WorkflowOperationException("Number of output tracks does not match number of encoding profiles");
432         }
433         for (Track composedTrack : composedTracks) {
434           if (info.getTargetFlavor() != null) { // Has Flavors
435             // set it to the matching flavor in the order listed
436             composedTrack.setFlavor(newFlavor(sourceTrack, info.getTargetFlavor()));
437             logger.debug("Composed track has flavor '{}'", composedTrack.getFlavor());
438           }
439           if (info.getTargetTags() != null) { // Has Tags
440             for (String tag : asList(info.getTargetTags())) {
441               logger.trace("Tagging composed track with '{}'", tag);
442               composedTrack.addTag(tag);
443             }
444           }
445           // Tag each output with encoding profile name if configured
446           if (entry.getValue().getTagWithProfile()) {
447             tagByProfile(composedTrack, eplist);
448           }
449           String fileName;
450           if (!isHLS || composedTrack.isMaster()) {
451             // name after source track if user facing
452             fileName = getFileNameFromElements(sourceTrack, composedTrack);
453           } else { // HLS-VOD
454             // Should all the files be renamed to the same as source
455             // which defeats the purpose of the suffix in encoding profiles
456             fileName = FilenameUtils.getName(composedTrack.getURI().getPath());
457           }
458           // store new tracks to mediaPackage
459           composedTrack.setURI(workspace.moveTo(composedTrack.getURI(), mediaPackage.getIdentifier().toString(),
460                   composedTrack.getIdentifier(), fileName));
461           mediaPackage.addDerived(composedTrack, sourceTrack);
462         }
463       } else {
464         logger.warn("No output from MultiEncode operation");
465       }
466     }
467     WorkflowOperationResult result = createResult(mediaPackage, Action.CONTINUE, totalTimeInQueue);
468     logger.debug("MultiEncode operation completed");
469     return result;
470   }
471 
472   /**
473    * Find the matching encoding profile for this track and tag by name
474    *
475    * @param track
476    * @param profiles
477    *          - profiles used to encode a track to multiple formats
478    * @return
479    */
480   private void tagByProfile(Track track, List<EncodingProfile> profiles) {
481     String rawfileName = track.getURI().getRawPath();
482     for (EncodingProfile ep : profiles) {
483       // #DCE
484       // Add any character at the beginning of the suffix so that it is properly
485       // converted in toSafeName (because the regex used there may treat the first
486       // character differently; the default one does now).
487       String suffixToSanitize = "X" + ep.getSuffix();
488       // !! workspace.putInCollection renames the file - need to do the same with suffix
489       String suffix = workspace.toSafeName(suffixToSanitize).substring(1);
490       if (suffix.length() > 0 && rawfileName.endsWith(suffix)) {
491         track.addTag(ep.getIdentifier());
492         return;
493       }
494     }
495   }
496 
497   private void decipherHLSPlaylistResults(Track track, JobInformation jobInfo, MediaPackage mediaPackage,
498           List<Track> composedTracks)
499           throws WorkflowOperationException, IllegalArgumentException, NotFoundException, IOException {
500     int nprofiles = jobInfo.getInfo().getProfiles().size();
501     List<Track> manifests = getManifest(composedTracks);
502 
503     if (manifests.size() != nprofiles) {
504       throw new WorkflowOperationException("Number of output playlists does not match number of encoding profiles");
505     }
506     if (composedTracks.size() != manifests.size() * 2 - 1) {
507       throw new WorkflowOperationException("Number of output media does not match number of encoding profiles");
508     }
509   }
510 
511   private MediaPackageElementFlavor newFlavor(Track track, String flavor) throws WorkflowOperationException {
512     if (StringUtils.isNotBlank(flavor)) {
513       try {
514         MediaPackageElementFlavor targetFlavor = MediaPackageElementFlavor.parseFlavor(flavor);
515         String flavorType = targetFlavor.getType();
516         String flavorSubtype = targetFlavor.getSubtype();
517         // Adjust the target flavor. Make sure to account for partial updates
518         if ("*".equals(flavorType)) {
519           flavorType = track.getFlavor().getType();
520         }
521         if ("*".equals(flavorSubtype)) {
522           flavorSubtype = track.getFlavor().getSubtype();
523         }
524         return (new MediaPackageElementFlavor(flavorType, flavorSubtype));
525       } catch (IllegalArgumentException e) {
526         throw new WorkflowOperationException("Target flavor '" + flavor + "' is malformed");
527       }
528     }
529     return null;
530   }
531 
532   /**
533    * This class is used to store context information for the jobs.
534    */
535   private static final class JobInformation {
536 
537     private Track track = null;
538     private ElementProfileTagFlavor info = null;
539     private boolean tagWithProfile;
540 
541     JobInformation(Track track, ElementProfileTagFlavor info, boolean tagWithProfile) {
542       this.track = track;
543       this.info = info;
544       this.tagWithProfile = tagWithProfile;
545     }
546 
547     public List<EncodingProfile> getProfileList() {
548       return info.encodingProfileList;
549     }
550 
551     /**
552      * Returns the track.
553      *
554      * @return the track
555      */
556     public Track getTrack() {
557       return track;
558     }
559 
560     public boolean getTagWithProfile() {
561       return this.tagWithProfile;
562     }
563 
564     public ElementProfileTagFlavor getInfo() {
565       return info;
566     }
567   }
568 
569   @Reference
570   @Override
571   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
572     super.setServiceRegistry(serviceRegistry);
573   }
574 
575 }