View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.handler.composer;
23  
24  import org.opencastproject.composer.api.ComposerService;
25  import org.opencastproject.composer.api.EncoderException;
26  import org.opencastproject.composer.api.EncodingProfile;
27  import org.opencastproject.composer.layout.Dimension;
28  import org.opencastproject.job.api.Job;
29  import org.opencastproject.job.api.JobContext;
30  import org.opencastproject.mediapackage.MediaPackage;
31  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
32  import org.opencastproject.mediapackage.MediaPackageElementParser;
33  import org.opencastproject.mediapackage.MediaPackageException;
34  import org.opencastproject.mediapackage.Track;
35  import org.opencastproject.mediapackage.TrackSupport;
36  import org.opencastproject.mediapackage.VideoStream;
37  import org.opencastproject.mediapackage.selector.TrackSelector;
38  import org.opencastproject.serviceregistry.api.ServiceRegistry;
39  import org.opencastproject.util.NotFoundException;
40  import org.opencastproject.util.data.Tuple;
41  import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
42  import org.opencastproject.workflow.api.ConfiguredTagsAndFlavors;
43  import org.opencastproject.workflow.api.WorkflowInstance;
44  import org.opencastproject.workflow.api.WorkflowOperationException;
45  import org.opencastproject.workflow.api.WorkflowOperationHandler;
46  import org.opencastproject.workflow.api.WorkflowOperationInstance;
47  import org.opencastproject.workflow.api.WorkflowOperationResult;
48  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
49  import org.opencastproject.workspace.api.Workspace;
50  
51  import org.apache.commons.io.FilenameUtils;
52  import org.apache.commons.lang3.BooleanUtils;
53  import org.apache.commons.lang3.StringUtils;
54  import org.apache.commons.lang3.math.NumberUtils;
55  import org.osgi.service.component.annotations.Component;
56  import org.osgi.service.component.annotations.Reference;
57  import org.slf4j.Logger;
58  import org.slf4j.LoggerFactory;
59  
60  import java.io.File;
61  import java.io.IOException;
62  import java.util.ArrayList;
63  import java.util.Collection;
64  import java.util.HashMap;
65  import java.util.List;
66  import java.util.Map;
67  import java.util.Map.Entry;
68  
69  /**
70   * The workflow definition for handling "concat" operations
71   */
72  @Component(
73      immediate = true,
74      service = WorkflowOperationHandler.class,
75      property = {
76          "service.description=Concat Workflow Operation Handler",
77          "workflow.operation=concat"
78      }
79  )
80  public class ConcatWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
81  
82    private static final String SOURCE_TAGS_PREFIX = "source-tags-part-";
83    private static final String SOURCE_FLAVOR_PREFIX = "source-flavor-part-";
84    private static final String MANDATORY_SUFFIX = "-mandatory";
85  
86    private static final String ENCODING_PROFILE = "encoding-profile";
87    private static final String OUTPUT_RESOLUTION = "output-resolution";
88    private static final String OUTPUT_FRAMERATE = "output-framerate";
89    private static final String OUTPUT_PART_PREFIX = "part-";
90  
91    /** Concatenate flavored media by lexicographical order -eg v01.mp4, v02.mp4, etc */
92    private static final String SOURCE_FLAVOR_NUMBERED_FILES = "source-flavor-numbered-files";
93    /**
94    * If codec and dimension are the same in all the src files, do not scale and transcode, just put all the content into
95    * the container
96    */
97    private static final String SAME_CODEC = "same-codec";
98    enum SourceType {
99      None, PrefixedFile, NumberedFile
100   };
101 
102 
103   /** The logging facility */
104   private static final Logger logger = LoggerFactory.getLogger(ConcatWorkflowOperationHandler.class);
105 
106   /** The composer service */
107   private ComposerService composerService = null;
108 
109   /** The local workspace */
110   private Workspace workspace = null;
111 
112   /**
113    * Callback for the OSGi declarative services configuration.
114    *
115    * @param composerService
116    *          the local composer service
117    */
118   @Reference
119   public void setComposerService(ComposerService composerService) {
120     this.composerService = composerService;
121   }
122 
123   /**
124    * Callback for declarative services configuration that will introduce us to the local workspace service.
125    * Implementation assumes that the reference is configured as being static.
126    *
127    * @param workspace
128    *          an instance of the workspace
129    */
130   @Reference
131   public void setWorkspace(Workspace workspace) {
132     this.workspace = workspace;
133   }
134 
135   /**
136    * {@inheritDoc}
137    *
138    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#start(
139    *      org.opencastproject.workflow.api.WorkflowInstance, JobContext)
140    */
141   @Override
142   public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
143           throws WorkflowOperationException {
144     logger.debug("Running concat workflow operation on workflow {}", workflowInstance.getId());
145 
146     try {
147       return concat(workflowInstance.getMediaPackage(), workflowInstance);
148     } catch (Exception e) {
149       throw new WorkflowOperationException(e);
150     }
151   }
152 
153   private WorkflowOperationResult concat(MediaPackage src, WorkflowInstance workflowInstance)
154           throws EncoderException, IOException, NotFoundException, MediaPackageException, WorkflowOperationException {
155     MediaPackage mediaPackage = (MediaPackage) src.clone();
156 
157     WorkflowOperationInstance operation = workflowInstance.getCurrentOperation();
158 
159     Map<Integer, Tuple<TrackSelector, Boolean>> trackSelectors = getTrackSelectors(operation);
160     String outputResolution = StringUtils.trimToNull(operation.getConfiguration(OUTPUT_RESOLUTION));
161     String outputFrameRate = StringUtils.trimToNull(operation.getConfiguration(OUTPUT_FRAMERATE));
162     String encodingProfile = StringUtils.trimToNull(operation.getConfiguration(ENCODING_PROFILE));
163     boolean sameCodec = BooleanUtils.toBoolean(operation.getConfiguration(SAME_CODEC));
164 
165     // Skip the worklow if no source-flavors or tags has been configured
166     if (trackSelectors.isEmpty()) {
167       logger.warn("No source-tags or source-flavors has been set.");
168       return createResult(mediaPackage, Action.SKIP);
169     }
170 
171     ConfiguredTagsAndFlavors tagsAndFlavors = getTagsAndFlavors(workflowInstance, Configuration.none,
172         Configuration.none, Configuration.many, Configuration.one);
173     ConfiguredTagsAndFlavors.TargetTags targetTagsOption = tagsAndFlavors.getTargetTags();
174     List<MediaPackageElementFlavor> targetFlavorOption = tagsAndFlavors.getTargetFlavors();
175 
176     // Target flavor
177     if (targetFlavorOption.isEmpty()) {
178       throw new WorkflowOperationException("Target flavor must be set!");
179     }
180 
181     // Find the encoding profile
182     if (encodingProfile == null) {
183       throw new WorkflowOperationException("Encoding profile must be set!");
184     }
185 
186     EncodingProfile profile = composerService.getProfile(encodingProfile);
187     if (profile == null) {
188       throw new WorkflowOperationException("Encoding profile '" + encodingProfile + "' was not found");
189     }
190 
191     // Output resolution - if not keeping dimensions the same, it must be set
192     if (!sameCodec && outputResolution == null) {
193       throw new WorkflowOperationException("Output resolution must be set!");
194     }
195 
196     Dimension outputDimension = null;
197     if (!sameCodec) { // Ignore resolution if same Codec - no scaling
198       if (outputResolution.startsWith(OUTPUT_PART_PREFIX)) {
199         if (!trackSelectors.keySet().contains(
200             Integer.parseInt(outputResolution.substring(OUTPUT_PART_PREFIX.length())))) {
201           throw new WorkflowOperationException("Output resolution part not set!");
202         }
203       } else {
204         try {
205           String[] outputResolutionArray = StringUtils.split(outputResolution, "x");
206           if (outputResolutionArray.length != 2) {
207             throw new WorkflowOperationException("Invalid format of output resolution!");
208           }
209           outputDimension = Dimension.dimension(Integer.parseInt(outputResolutionArray[0]),
210             Integer.parseInt(outputResolutionArray[1]));
211         } catch (WorkflowOperationException e) {
212           throw e;
213         } catch (Exception e) {
214           throw new WorkflowOperationException("Unable to parse output resolution!", e);
215         }
216       }
217     }
218 
219     float fps = -1.0f;
220     // Ignore fps if same Codec - no scaling
221     if (!sameCodec && StringUtils.isNotEmpty(outputFrameRate)) {
222       if (StringUtils.startsWith(outputFrameRate, OUTPUT_PART_PREFIX)) {
223         if (!NumberUtils.isCreatable(outputFrameRate.substring(OUTPUT_PART_PREFIX.length()))
224                 || !trackSelectors.keySet().contains(Integer.parseInt(
225                         outputFrameRate.substring(OUTPUT_PART_PREFIX.length())))) {
226           throw new WorkflowOperationException("Output frame rate part not set or invalid!");
227         }
228       } else if (NumberUtils.isCreatable(outputFrameRate)) {
229         fps = NumberUtils.toFloat(outputFrameRate);
230       } else {
231         throw new WorkflowOperationException("Unable to parse output frame rate!");
232       }
233     }
234 
235     MediaPackageElementFlavor targetFlavor = null;
236     try {
237       targetFlavor = targetFlavorOption.get(0);
238       if ("*".equals(targetFlavor.getType()) || "*".equals(targetFlavor.getSubtype())) {
239         throw new WorkflowOperationException("Target flavor must have a type and a subtype, '*' are not allowed!");
240       }
241     } catch (IllegalArgumentException e) {
242       throw new WorkflowOperationException("Target flavor '" + targetFlavorOption + "' is malformed");
243     }
244 
245     List<Track> tracks = new ArrayList<Track>();
246     for (Entry<Integer, Tuple<TrackSelector, Boolean>> trackSelector : trackSelectors.entrySet()) {
247       Collection<Track> tracksForSelector = trackSelector.getValue().getA().select(mediaPackage, false);
248       String currentFlavor = StringUtils.join(trackSelector.getValue().getA().getFlavors());
249       String currentTag = StringUtils.join(trackSelector.getValue().getA().getTags());
250 
251       // Cannot mix prefix-number tracks with numbered files
252       // PREFIXED_FILES must have multiple files, but numbered file can skip the operation if there is only one
253       if (trackSelectors.size() == 1) {
254         // NUMBERED FILES will have one trackSelector only and multiple sorted files in it
255         List<Track> list = new ArrayList<>(tracksForSelector);
256         list.sort((left, right) -> {
257           // Get and compare basename only, getPath() for mock
258           String l = (new File(left.getURI().getPath())).getName();
259           String r = (new File(right.getURI().getPath())).getName();
260           return (l.compareTo(r));
261         });
262         tracksForSelector = list;
263       } else if (tracksForSelector.size() > 1) {
264         logger.warn("More than one track has been found with flavor '{}' and/or tag '{}' for concat operation, "
265                         + "skipping concatenation!", currentFlavor, currentTag);
266         return createResult(mediaPackage, Action.SKIP);
267       } else if (tracksForSelector.size() == 0 && trackSelector.getValue().getB()) {
268         logger.warn("No track has been found with flavor '{}' and/or tag '{}' for concat operation, "
269                         + "skipping concatenation!", currentFlavor, currentTag);
270         return createResult(mediaPackage, Action.SKIP);
271       } else if (tracksForSelector.size() == 0 && !trackSelector.getValue().getB()) {
272         logger.info("No track has been found with flavor '{}' and/or tag '{}' for concat operation, skipping track!",
273                 currentFlavor, currentTag);
274         continue;
275       }
276 
277       for (Track t : tracksForSelector) {
278         tracks.add(t);
279         VideoStream[] videoStreams = TrackSupport.byType(t.getStreams(), VideoStream.class);
280         if (videoStreams.length == 0) {
281           logger.info("No video stream available in the track with flavor {}! {}", currentFlavor, t);
282           return createResult(mediaPackage, Action.SKIP);
283         }
284         if (StringUtils.startsWith(outputResolution, OUTPUT_PART_PREFIX)
285             && NumberUtils.isCreatable(outputResolution.substring(OUTPUT_PART_PREFIX.length()))
286             && trackSelector.getKey() == Integer.parseInt(outputResolution.substring(OUTPUT_PART_PREFIX.length()))) {
287           outputDimension = new Dimension(videoStreams[0].getFrameWidth(), videoStreams[0].getFrameHeight());
288           if (!trackSelector.getValue().getB()) {
289             logger.warn("Output resolution track {} must be mandatory, skipping concatenation!", outputResolution);
290             return createResult(mediaPackage, Action.SKIP);
291           }
292         }
293         if (fps <= 0 && StringUtils.startsWith(outputFrameRate, OUTPUT_PART_PREFIX)
294                 && NumberUtils.isCreatable(outputFrameRate.substring(OUTPUT_PART_PREFIX.length()))
295                 && trackSelector.getKey() == Integer.parseInt(outputFrameRate.substring(OUTPUT_PART_PREFIX.length()))) {
296           fps = videoStreams[0].getFrameRate();
297         }
298       }
299     }
300 
301     if (tracks.size() == 0) {
302       logger.warn("No tracks found for concating operation, skipping concatenation!");
303       return createResult(mediaPackage, Action.SKIP);
304     } else if (tracks.size() == 1) {
305       Track track = (Track) tracks.get(0).clone();
306       track.setIdentifier(null);
307       addNewTrack(mediaPackage, track, targetTagsOption, targetFlavor);
308       logger.info("At least two tracks are needed for the concating operation, skipping concatenation!");
309       return createResult(mediaPackage, Action.SKIP);
310     }
311 
312     Job concatJob;
313     if (fps > 0) {
314       concatJob = composerService.concat(profile.getIdentifier(), outputDimension,
315               fps, sameCodec, tracks.toArray(new Track[tracks.size()]));
316     } else {
317       concatJob = composerService.concat(profile.getIdentifier(), outputDimension,
318               sameCodec,tracks.toArray(new Track[tracks.size()]));
319     }
320 
321     // Wait for the jobs to return
322     if (!waitForStatus(concatJob).isSuccess()) {
323       throw new WorkflowOperationException("The concat job did not complete successfully");
324     }
325 
326     if (concatJob.getPayload().length() > 0) {
327 
328       Track concatTrack = (Track) MediaPackageElementParser.getFromXml(concatJob.getPayload());
329 
330       concatTrack.setURI(workspace.moveTo(concatTrack.getURI(), mediaPackage.getIdentifier().toString(),
331               concatTrack.getIdentifier(), "concat." + FilenameUtils.getExtension(concatTrack.getURI().toString())));
332 
333       addNewTrack(mediaPackage, concatTrack, targetTagsOption, targetFlavor);
334 
335       WorkflowOperationResult result = createResult(mediaPackage, Action.CONTINUE, concatJob.getQueueTime());
336       logger.debug("Concat operation completed");
337       return result;
338     } else {
339       logger.info("concat operation unsuccessful, no payload returned: {}", concatJob);
340       return createResult(mediaPackage, Action.SKIP);
341     }
342   }
343 
344   private void addNewTrack(MediaPackage mediaPackage, Track track,
345           ConfiguredTagsAndFlavors.TargetTags targetTags,
346           MediaPackageElementFlavor targetFlavor) {
347     // Adjust the target tags
348     applyTargetTagsToElement(targetTags, track);
349 
350     // Adjust the target flavor.
351     track.setFlavor(targetFlavor);
352     logger.debug("Compound track has flavor '{}'", track.getFlavor());
353 
354     mediaPackage.add(track);
355   }
356 
357   private Map<Integer, Tuple<TrackSelector, Boolean>> getTrackSelectors(WorkflowOperationInstance operation)
358           throws WorkflowOperationException {
359     Map<Integer, Tuple<TrackSelector, Boolean>> trackSelectors = new HashMap<Integer, Tuple<TrackSelector, Boolean>>();
360     SourceType flavorType = SourceType.None;
361     String srcFlavor = null;
362 
363     // Search config for SOURCE_FLAVOR_NUMBERED_FILES and SOURCE_FLAVOR_PREFIX
364     for (String key : operation.getConfigurationKeys()) {
365       if (key.startsWith(SOURCE_FLAVOR_PREFIX) || key.startsWith(SOURCE_TAGS_PREFIX)) {
366         if (flavorType == SourceType.None) {
367           flavorType = SourceType.PrefixedFile;
368         } else if (flavorType != SourceType.PrefixedFile) {
369           throw new WorkflowOperationException(
370                   "Cannot mix source prefix flavor/tags with source numbered files - use one type of selector only");
371         }
372       }
373 
374       if (key.equals(SOURCE_FLAVOR_NUMBERED_FILES)) { // Search config for SOURCE_FLAVORS_NUMBERED_FILES
375         srcFlavor = operation.getConfiguration(key);
376         if (flavorType == SourceType.None) {
377           flavorType = SourceType.NumberedFile;
378           srcFlavor = operation.getConfiguration(key);
379         } else if (flavorType != SourceType.NumberedFile) {
380           throw new WorkflowOperationException(
381                   "Cannot mix source prefix flavor/tags with source numbered files - use one type of selector only");
382         }
383       }
384     }
385 
386     // if is SOURCE_FLAVOR_NUMBERED_FILES, do not use prefixed (tags or flavor)
387     if (srcFlavor != null) { // Numbered files has only one selector
388       int number = 0;
389       Tuple<TrackSelector, Boolean> selectorTuple = trackSelectors.get(number);
390       selectorTuple = Tuple.tuple(new TrackSelector(), true);
391       TrackSelector trackSelector = selectorTuple.getA();
392       trackSelector.addFlavor(srcFlavor);
393       trackSelectors.put(number, selectorTuple);
394       return trackSelectors;
395     }
396 
397     // Prefix only
398     for (String key : operation.getConfigurationKeys()) {
399       String tags = null;
400       String flavor = null;
401       Boolean mandatory = true;
402       int number = -1;
403       if (key.startsWith(SOURCE_TAGS_PREFIX) && !key.endsWith(MANDATORY_SUFFIX)) {
404         number = NumberUtils.toInt(key.substring(SOURCE_TAGS_PREFIX.length()), -1);
405         tags = operation.getConfiguration(key);
406         mandatory = BooleanUtils.toBooleanObject(operation.getConfiguration(SOURCE_TAGS_PREFIX.concat(
407                 Integer.toString(number)).concat(MANDATORY_SUFFIX)));
408       } else if (key.startsWith(SOURCE_FLAVOR_PREFIX) && !key.endsWith(MANDATORY_SUFFIX)) {
409         number = NumberUtils.toInt(key.substring(SOURCE_FLAVOR_PREFIX.length()), -1);
410         flavor = operation.getConfiguration(key);
411         mandatory = BooleanUtils.toBooleanObject(operation.getConfiguration(SOURCE_FLAVOR_PREFIX.concat(
412                 Integer.toString(number)).concat(MANDATORY_SUFFIX)));
413       }
414 
415       if (number < 0) {
416         continue;
417       }
418 
419       Tuple<TrackSelector, Boolean> selectorTuple = trackSelectors.get(number);
420       if (selectorTuple == null) {
421         selectorTuple = Tuple.tuple(new TrackSelector(), BooleanUtils.toBooleanDefaultIfNull(mandatory, false));
422       } else {
423         selectorTuple = Tuple.tuple(selectorTuple.getA(),
424                 selectorTuple.getB() || BooleanUtils.toBooleanDefaultIfNull(mandatory, false));
425       }
426       TrackSelector trackSelector = selectorTuple.getA();
427       if (StringUtils.isNotBlank(tags)) {
428         for (String tag : StringUtils.split(tags, ",")) {
429           trackSelector.addTag(tag);
430         }
431       }
432       if (StringUtils.isNotBlank(flavor)) {
433         try {
434           trackSelector.addFlavor(flavor);
435         } catch (IllegalArgumentException e) {
436           throw new WorkflowOperationException("Source flavor '" + flavor + "' is malformed");
437         }
438       }
439 
440       trackSelectors.put(number, selectorTuple);
441     }
442     return trackSelectors;
443   }
444 
445   @Reference
446   @Override
447   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
448     super.setServiceRegistry(serviceRegistry);
449   }
450 
451 }