View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.handler.composer;
23  
24  import org.opencastproject.composer.api.ComposerService;
25  import org.opencastproject.composer.api.EncoderException;
26  import org.opencastproject.composer.api.EncodingProfile;
27  import org.opencastproject.composer.layout.Dimension;
28  import org.opencastproject.job.api.Job;
29  import org.opencastproject.job.api.JobContext;
30  import org.opencastproject.mediapackage.MediaPackage;
31  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
32  import org.opencastproject.mediapackage.MediaPackageElementParser;
33  import org.opencastproject.mediapackage.MediaPackageException;
34  import org.opencastproject.mediapackage.Track;
35  import org.opencastproject.mediapackage.TrackSupport;
36  import org.opencastproject.mediapackage.VideoStream;
37  import org.opencastproject.mediapackage.selector.TrackSelector;
38  import org.opencastproject.serviceregistry.api.ServiceRegistry;
39  import org.opencastproject.util.NotFoundException;
40  import org.opencastproject.util.data.Tuple;
41  import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
42  import org.opencastproject.workflow.api.ConfiguredTagsAndFlavors;
43  import org.opencastproject.workflow.api.WorkflowInstance;
44  import org.opencastproject.workflow.api.WorkflowOperationException;
45  import org.opencastproject.workflow.api.WorkflowOperationHandler;
46  import org.opencastproject.workflow.api.WorkflowOperationInstance;
47  import org.opencastproject.workflow.api.WorkflowOperationResult;
48  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
49  import org.opencastproject.workspace.api.Workspace;
50  
51  import org.apache.commons.io.FilenameUtils;
52  import org.apache.commons.lang3.BooleanUtils;
53  import org.apache.commons.lang3.StringUtils;
54  import org.apache.commons.lang3.math.NumberUtils;
55  import org.osgi.service.component.annotations.Component;
56  import org.osgi.service.component.annotations.Reference;
57  import org.slf4j.Logger;
58  import org.slf4j.LoggerFactory;
59  
60  import java.io.File;
61  import java.io.IOException;
62  import java.util.ArrayList;
63  import java.util.Collection;
64  import java.util.HashMap;
65  import java.util.List;
66  import java.util.Map;
67  import java.util.Map.Entry;
68  
69  /**
70   * The workflow definition for handling "concat" operations
71   */
72  @Component(
73      immediate = true,
74      service = WorkflowOperationHandler.class,
75      property = {
76          "service.description=Concat Workflow Operation Handler",
77          "workflow.operation=concat"
78      }
79  )
80  public class ConcatWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
81  
82    private static final String SOURCE_TAGS_PREFIX = "source-tags-part-";
83    private static final String SOURCE_FLAVOR_PREFIX = "source-flavor-part-";
84    private static final String MANDATORY_SUFFIX = "-mandatory";
85  
86    private static final String ENCODING_PROFILE = "encoding-profile";
87    private static final String OUTPUT_RESOLUTION = "output-resolution";
88    private static final String OUTPUT_FRAMERATE = "output-framerate";
89    private static final String OUTPUT_PART_PREFIX = "part-";
90  
91    /** Concatenate flavored media by lexicographical order -eg v01.mp4, v02.mp4, etc */
92    private static final String SOURCE_FLAVOR_NUMBERED_FILES = "source-flavor-numbered-files";
93    /**
94    * If codec and dimension are the same in all the src files, do not scale and transcode, just put all the content into
95    * the container
96    */
97    private static final String SAME_CODEC = "same-codec";
98    enum SourceType {
99      None, PrefixedFile, NumberedFile
100   };
101 
102 
103   /** The logging facility */
104   private static final Logger logger = LoggerFactory.getLogger(ConcatWorkflowOperationHandler.class);
105 
106   /** The composer service */
107   private ComposerService composerService = null;
108 
109   /** The local workspace */
110   private Workspace workspace = null;
111 
112   /**
113    * Callback for the OSGi declarative services configuration.
114    *
115    * @param composerService
116    *          the local composer service
117    */
118   @Reference
119   public void setComposerService(ComposerService composerService) {
120     this.composerService = composerService;
121   }
122 
123   /**
124    * Callback for declarative services configuration that will introduce us to the local workspace service.
125    * Implementation assumes that the reference is configured as being static.
126    *
127    * @param workspace
128    *          an instance of the workspace
129    */
130   @Reference
131   public void setWorkspace(Workspace workspace) {
132     this.workspace = workspace;
133   }
134 
135   /**
136    * {@inheritDoc}
137    *
138    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#start(org.opencastproject.workflow.api.WorkflowInstance,
139    *      JobContext)
140    */
141   @Override
142   public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
143           throws WorkflowOperationException {
144     logger.debug("Running concat workflow operation on workflow {}", workflowInstance.getId());
145 
146     try {
147       return concat(workflowInstance.getMediaPackage(), workflowInstance);
148     } catch (Exception e) {
149       throw new WorkflowOperationException(e);
150     }
151   }
152 
153   private WorkflowOperationResult concat(MediaPackage src, WorkflowInstance workflowInstance)
154           throws EncoderException, IOException, NotFoundException, MediaPackageException, WorkflowOperationException {
155     MediaPackage mediaPackage = (MediaPackage) src.clone();
156 
157     WorkflowOperationInstance operation = workflowInstance.getCurrentOperation();
158 
159     Map<Integer, Tuple<TrackSelector, Boolean>> trackSelectors = getTrackSelectors(operation);
160     String outputResolution = StringUtils.trimToNull(operation.getConfiguration(OUTPUT_RESOLUTION));
161     String outputFrameRate = StringUtils.trimToNull(operation.getConfiguration(OUTPUT_FRAMERATE));
162     String encodingProfile = StringUtils.trimToNull(operation.getConfiguration(ENCODING_PROFILE));
163     boolean sameCodec = BooleanUtils.toBoolean(operation.getConfiguration(SAME_CODEC));
164 
165     // Skip the worklow if no source-flavors or tags has been configured
166     if (trackSelectors.isEmpty()) {
167       logger.warn("No source-tags or source-flavors has been set.");
168       return createResult(mediaPackage, Action.SKIP);
169     }
170 
171     ConfiguredTagsAndFlavors tagsAndFlavors = getTagsAndFlavors(workflowInstance, Configuration.none, Configuration.none, Configuration.many, Configuration.one);
172     ConfiguredTagsAndFlavors.TargetTags targetTagsOption = tagsAndFlavors.getTargetTags();
173     List<MediaPackageElementFlavor> targetFlavorOption = tagsAndFlavors.getTargetFlavors();
174 
175     // Target flavor
176     if (targetFlavorOption.isEmpty())
177       throw new WorkflowOperationException("Target flavor must be set!");
178 
179     // Find the encoding profile
180     if (encodingProfile == null)
181       throw new WorkflowOperationException("Encoding profile must be set!");
182 
183     EncodingProfile profile = composerService.getProfile(encodingProfile);
184     if (profile == null)
185       throw new WorkflowOperationException("Encoding profile '" + encodingProfile + "' was not found");
186 
187     // Output resolution - if not keeping dimensions the same, it must be set
188     if (!sameCodec && outputResolution == null)
189         throw new WorkflowOperationException("Output resolution must be set!");
190 
191     Dimension outputDimension = null;
192     if (!sameCodec) { // Ignore resolution if same Codec - no scaling
193       if (outputResolution.startsWith(OUTPUT_PART_PREFIX)) {
194         if (!trackSelectors.keySet().contains(
195             Integer.parseInt(outputResolution.substring(OUTPUT_PART_PREFIX.length())))) {
196           throw new WorkflowOperationException("Output resolution part not set!");
197         }
198       } else {
199         try {
200           String[] outputResolutionArray = StringUtils.split(outputResolution, "x");
201           if (outputResolutionArray.length != 2) {
202             throw new WorkflowOperationException("Invalid format of output resolution!");
203           }
204           outputDimension = Dimension.dimension(Integer.parseInt(outputResolutionArray[0]),
205             Integer.parseInt(outputResolutionArray[1]));
206         } catch (WorkflowOperationException e) {
207           throw e;
208         } catch (Exception e) {
209           throw new WorkflowOperationException("Unable to parse output resolution!", e);
210         }
211       }
212     }
213 
214     float fps = -1.0f;
215     // Ignore fps if same Codec - no scaling
216     if (!sameCodec && StringUtils.isNotEmpty(outputFrameRate)) {
217       if (StringUtils.startsWith(outputFrameRate, OUTPUT_PART_PREFIX)) {
218         if (!NumberUtils.isCreatable(outputFrameRate.substring(OUTPUT_PART_PREFIX.length()))
219                 || !trackSelectors.keySet().contains(Integer.parseInt(
220                         outputFrameRate.substring(OUTPUT_PART_PREFIX.length())))) {
221           throw new WorkflowOperationException("Output frame rate part not set or invalid!");
222         }
223       } else if (NumberUtils.isCreatable(outputFrameRate)) {
224         fps = NumberUtils.toFloat(outputFrameRate);
225       } else {
226         throw new WorkflowOperationException("Unable to parse output frame rate!");
227       }
228     }
229 
230     MediaPackageElementFlavor targetFlavor = null;
231     try {
232       targetFlavor = targetFlavorOption.get(0);
233       if ("*".equals(targetFlavor.getType()) || "*".equals(targetFlavor.getSubtype()))
234         throw new WorkflowOperationException("Target flavor must have a type and a subtype, '*' are not allowed!");
235     } catch (IllegalArgumentException e) {
236       throw new WorkflowOperationException("Target flavor '" + targetFlavorOption + "' is malformed");
237     }
238 
239     List<Track> tracks = new ArrayList<Track>();
240     for (Entry<Integer, Tuple<TrackSelector, Boolean>> trackSelector : trackSelectors.entrySet()) {
241       Collection<Track> tracksForSelector = trackSelector.getValue().getA().select(mediaPackage, false);
242       String currentFlavor = StringUtils.join(trackSelector.getValue().getA().getFlavors());
243       String currentTag = StringUtils.join(trackSelector.getValue().getA().getTags());
244 
245       // Cannot mix prefix-number tracks with numbered files
246       // PREFIXED_FILES must have multiple files, but numbered file can skip the operation if there is only one
247       if (trackSelectors.size() == 1) {
248         // NUMBERED FILES will have one trackSelector only and multiple sorted files in it
249         List<Track> list = new ArrayList<>(tracksForSelector);
250         list.sort((left, right) -> {
251             String l = (new File(left.getURI().getPath())).getName(); // Get and compare basename only, getPath() for mock
252             String r = (new File(right.getURI().getPath())).getName();
253             return (l.compareTo(r));
254           });
255         tracksForSelector = list;
256       } else if (tracksForSelector.size() > 1) {
257         logger.warn("More than one track has been found with flavor '{}' and/or tag '{}' for concat operation, "
258                         + "skipping concatenation!", currentFlavor, currentTag);
259         return createResult(mediaPackage, Action.SKIP);
260       } else if (tracksForSelector.size() == 0 && trackSelector.getValue().getB()) {
261         logger.warn("No track has been found with flavor '{}' and/or tag '{}' for concat operation, "
262                         + "skipping concatenation!", currentFlavor, currentTag);
263         return createResult(mediaPackage, Action.SKIP);
264       } else if (tracksForSelector.size() == 0 && !trackSelector.getValue().getB()) {
265         logger.info("No track has been found with flavor '{}' and/or tag '{}' for concat operation, skipping track!",
266                 currentFlavor, currentTag);
267         continue;
268       }
269 
270       for (Track t : tracksForSelector) {
271         tracks.add(t);
272         VideoStream[] videoStreams = TrackSupport.byType(t.getStreams(), VideoStream.class);
273         if (videoStreams.length == 0) {
274           logger.info("No video stream available in the track with flavor {}! {}", currentFlavor, t);
275           return createResult(mediaPackage, Action.SKIP);
276         }
277         if (StringUtils.startsWith(outputResolution, OUTPUT_PART_PREFIX)
278                 && NumberUtils.isCreatable(outputResolution.substring(OUTPUT_PART_PREFIX.length()))
279                 && trackSelector.getKey() == Integer.parseInt(outputResolution.substring(OUTPUT_PART_PREFIX.length()))) {
280           outputDimension = new Dimension(videoStreams[0].getFrameWidth(), videoStreams[0].getFrameHeight());
281           if (!trackSelector.getValue().getB()) {
282             logger.warn("Output resolution track {} must be mandatory, skipping concatenation!", outputResolution);
283             return createResult(mediaPackage, Action.SKIP);
284           }
285         }
286         if (fps <= 0 && StringUtils.startsWith(outputFrameRate, OUTPUT_PART_PREFIX)
287                 && NumberUtils.isCreatable(outputFrameRate.substring(OUTPUT_PART_PREFIX.length()))
288                 && trackSelector.getKey() == Integer.parseInt(outputFrameRate.substring(OUTPUT_PART_PREFIX.length()))) {
289           fps = videoStreams[0].getFrameRate();
290         }
291       }
292     }
293 
294     if (tracks.size() == 0) {
295       logger.warn("No tracks found for concating operation, skipping concatenation!");
296       return createResult(mediaPackage, Action.SKIP);
297     } else if (tracks.size() == 1) {
298       Track track = (Track) tracks.get(0).clone();
299       track.setIdentifier(null);
300       addNewTrack(mediaPackage, track, targetTagsOption, targetFlavor);
301       logger.info("At least two tracks are needed for the concating operation, skipping concatenation!");
302       return createResult(mediaPackage, Action.SKIP);
303     }
304 
305     Job concatJob;
306     if (fps > 0) {
307       concatJob = composerService.concat(profile.getIdentifier(), outputDimension,
308               fps, sameCodec, tracks.toArray(new Track[tracks.size()]));
309     } else {
310       concatJob = composerService.concat(profile.getIdentifier(), outputDimension,
311               sameCodec,tracks.toArray(new Track[tracks.size()]));
312     }
313 
314     // Wait for the jobs to return
315     if (!waitForStatus(concatJob).isSuccess())
316       throw new WorkflowOperationException("The concat job did not complete successfully");
317 
318     if (concatJob.getPayload().length() > 0) {
319 
320       Track concatTrack = (Track) MediaPackageElementParser.getFromXml(concatJob.getPayload());
321 
322       concatTrack.setURI(workspace.moveTo(concatTrack.getURI(), mediaPackage.getIdentifier().toString(),
323               concatTrack.getIdentifier(), "concat." + FilenameUtils.getExtension(concatTrack.getURI().toString())));
324 
325       addNewTrack(mediaPackage, concatTrack, targetTagsOption, targetFlavor);
326 
327       WorkflowOperationResult result = createResult(mediaPackage, Action.CONTINUE, concatJob.getQueueTime());
328       logger.debug("Concat operation completed");
329       return result;
330     } else {
331       logger.info("concat operation unsuccessful, no payload returned: {}", concatJob);
332       return createResult(mediaPackage, Action.SKIP);
333     }
334   }
335 
336   private void addNewTrack(MediaPackage mediaPackage, Track track,
337           ConfiguredTagsAndFlavors.TargetTags targetTags,
338           MediaPackageElementFlavor targetFlavor) {
339     // Adjust the target tags
340     applyTargetTagsToElement(targetTags, track);
341 
342     // Adjust the target flavor.
343     track.setFlavor(targetFlavor);
344     logger.debug("Compound track has flavor '{}'", track.getFlavor());
345 
346     mediaPackage.add(track);
347   }
348 
349   private Map<Integer, Tuple<TrackSelector, Boolean>> getTrackSelectors(WorkflowOperationInstance operation)
350           throws WorkflowOperationException {
351     Map<Integer, Tuple<TrackSelector, Boolean>> trackSelectors = new HashMap<Integer, Tuple<TrackSelector, Boolean>>();
352     SourceType flavorType = SourceType.None;
353     String srcFlavor = null;
354 
355     // Search config for SOURCE_FLAVOR_NUMBERED_FILES and SOURCE_FLAVOR_PREFIX
356     for (String key : operation.getConfigurationKeys()) {
357       if (key.startsWith(SOURCE_FLAVOR_PREFIX) || key.startsWith(SOURCE_TAGS_PREFIX)) {
358         if (flavorType == SourceType.None) {
359           flavorType = SourceType.PrefixedFile;
360         } else if (flavorType != SourceType.PrefixedFile) {
361           throw new WorkflowOperationException(
362                   "Cannot mix source prefix flavor/tags with source numbered files - use one type of selector only");
363         }
364       }
365 
366       if (key.equals(SOURCE_FLAVOR_NUMBERED_FILES)) { // Search config for SOURCE_FLAVORS_NUMBERED_FILES
367         srcFlavor = operation.getConfiguration(key);
368         if (flavorType == SourceType.None) {
369           flavorType = SourceType.NumberedFile;
370           srcFlavor = operation.getConfiguration(key);
371         } else if (flavorType != SourceType.NumberedFile) {
372           throw new WorkflowOperationException(
373                   "Cannot mix source prefix flavor/tags with source numbered files - use one type of selector only");
374         }
375       }
376     }
377 
378     // if is SOURCE_FLAVOR_NUMBERED_FILES, do not use prefixed (tags or flavor)
379     if (srcFlavor != null) { // Numbered files has only one selector
380       int number = 0;
381       Tuple<TrackSelector, Boolean> selectorTuple = trackSelectors.get(number);
382       selectorTuple = Tuple.tuple(new TrackSelector(), true);
383       TrackSelector trackSelector = selectorTuple.getA();
384       trackSelector.addFlavor(srcFlavor);
385       trackSelectors.put(number, selectorTuple);
386       return trackSelectors;
387     }
388 
389     // Prefix only
390     for (String key : operation.getConfigurationKeys()) {
391       String tags = null;
392       String flavor = null;
393       Boolean mandatory = true;
394       int number = -1;
395       if (key.startsWith(SOURCE_TAGS_PREFIX) && !key.endsWith(MANDATORY_SUFFIX)) {
396         number = NumberUtils.toInt(key.substring(SOURCE_TAGS_PREFIX.length()), -1);
397         tags = operation.getConfiguration(key);
398         mandatory = BooleanUtils.toBooleanObject(operation.getConfiguration(SOURCE_TAGS_PREFIX.concat(
399                 Integer.toString(number)).concat(MANDATORY_SUFFIX)));
400       } else if (key.startsWith(SOURCE_FLAVOR_PREFIX) && !key.endsWith(MANDATORY_SUFFIX)) {
401         number = NumberUtils.toInt(key.substring(SOURCE_FLAVOR_PREFIX.length()), -1);
402         flavor = operation.getConfiguration(key);
403         mandatory = BooleanUtils.toBooleanObject(operation.getConfiguration(SOURCE_FLAVOR_PREFIX.concat(
404                 Integer.toString(number)).concat(MANDATORY_SUFFIX)));
405       }
406 
407       if (number < 0)
408         continue;
409 
410       Tuple<TrackSelector, Boolean> selectorTuple = trackSelectors.get(number);
411       if (selectorTuple == null) {
412         selectorTuple = Tuple.tuple(new TrackSelector(), BooleanUtils.toBooleanDefaultIfNull(mandatory, false));
413       } else {
414         selectorTuple = Tuple.tuple(selectorTuple.getA(),
415                 selectorTuple.getB() || BooleanUtils.toBooleanDefaultIfNull(mandatory, false));
416       }
417       TrackSelector trackSelector = selectorTuple.getA();
418       if (StringUtils.isNotBlank(tags)) {
419         for (String tag : StringUtils.split(tags, ",")) {
420           trackSelector.addTag(tag);
421         }
422       }
423       if (StringUtils.isNotBlank(flavor)) {
424         try {
425           trackSelector.addFlavor(flavor);
426         } catch (IllegalArgumentException e) {
427           throw new WorkflowOperationException("Source flavor '" + flavor + "' is malformed");
428         }
429       }
430 
431       trackSelectors.put(number, selectorTuple);
432     }
433     return trackSelectors;
434   }
435 
436   @Reference
437   @Override
438   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
439     super.setServiceRegistry(serviceRegistry);
440   }
441 
442 }