View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.handler.sox;
23  
24  import org.opencastproject.composer.api.ComposerService;
25  import org.opencastproject.composer.api.EncoderException;
26  import org.opencastproject.job.api.Job;
27  import org.opencastproject.job.api.JobContext;
28  import org.opencastproject.mediapackage.AudioStream;
29  import org.opencastproject.mediapackage.MediaPackage;
30  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
31  import org.opencastproject.mediapackage.MediaPackageElementParser;
32  import org.opencastproject.mediapackage.MediaPackageException;
33  import org.opencastproject.mediapackage.Track;
34  import org.opencastproject.mediapackage.selector.AbstractMediaPackageElementSelector;
35  import org.opencastproject.mediapackage.selector.TrackSelector;
36  import org.opencastproject.mediapackage.track.AudioStreamImpl;
37  import org.opencastproject.mediapackage.track.TrackImpl;
38  import org.opencastproject.serviceregistry.api.ServiceRegistry;
39  import org.opencastproject.sox.api.SoxException;
40  import org.opencastproject.sox.api.SoxService;
41  import org.opencastproject.util.NotFoundException;
42  import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
43  import org.opencastproject.workflow.api.ConfiguredTagsAndFlavors;
44  import org.opencastproject.workflow.api.WorkflowInstance;
45  import org.opencastproject.workflow.api.WorkflowOperationException;
46  import org.opencastproject.workflow.api.WorkflowOperationHandler;
47  import org.opencastproject.workflow.api.WorkflowOperationInstance;
48  import org.opencastproject.workflow.api.WorkflowOperationResult;
49  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
50  import org.opencastproject.workspace.api.Workspace;
51  
52  import org.apache.commons.lang3.BooleanUtils;
53  import org.apache.commons.lang3.StringUtils;
54  import org.osgi.service.component.annotations.Component;
55  import org.osgi.service.component.annotations.Reference;
56  import org.slf4j.Logger;
57  import org.slf4j.LoggerFactory;
58  
59  import java.io.IOException;
60  import java.net.URI;
61  import java.util.ArrayList;
62  import java.util.Collection;
63  import java.util.HashMap;
64  import java.util.List;
65  import java.util.Map;
66  
67  /**
68   * The workflow definition for handling "sox" operations
69   */
70  @Component(
71      immediate = true,
72      service = WorkflowOperationHandler.class,
73      property = {
74          "service.description=Normalize Audio Workflow Operation Handler",
75          "workflow.operation=normalize-audio"
76      }
77  )
78  public class NormalizeAudioWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
79  
80    /** The logging facility */
81    private static final Logger logger = LoggerFactory.getLogger(NormalizeAudioWorkflowOperationHandler.class);
82  
83    /** Name of the 'encode to SoX audio only work copy' encoding profile */
84    public static final String SOX_AONLY_PROFILE = "sox-audio-only.work";
85  
86    /** Name of the muxing encoding profile */
87    public static final String SOX_AREPLACE_PROFILE = "sox-audio-replace.work";
88  
89    /** The SoX service */
90    private SoxService soxService = null;
91  
92    /** The composer service */
93    private ComposerService composerService = null;
94  
95    /** The local workspace */
96    private Workspace workspace = null;
97  
98    /**
99     * Callback for the OSGi declarative services configuration.
100    *
101    * @param soxService
102    *          the SoX service
103    */
104   @Reference
105   protected void setSoxService(SoxService soxService) {
106     this.soxService = soxService;
107   }
108 
109   /**
110    * Callback for the OSGi declarative services configuration.
111    *
112    * @param composerService
113    *          the composer service
114    */
115   @Reference
116   protected void setComposerService(ComposerService composerService) {
117     this.composerService = composerService;
118   }
119 
120   /**
121    * Callback for declarative services configuration that will introduce us to the local workspace service.
122    * Implementation assumes that the reference is configured as being static.
123    *
124    * @param workspace
125    *          an instance of the workspace
126    */
127   @Reference
128   public void setWorkspace(Workspace workspace) {
129     this.workspace = workspace;
130   }
131 
132   @Reference
133   @Override
134   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
135     super.setServiceRegistry(serviceRegistry);
136   }
137 
138   public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
139           throws WorkflowOperationException {
140     logger.debug("Running sox workflow operation on workflow {}", workflowInstance.getId());
141 
142     try {
143       return normalize(workflowInstance.getMediaPackage(), workflowInstance);
144     } catch (Exception e) {
145       throw new WorkflowOperationException(e);
146     }
147   }
148 
149   private WorkflowOperationResult normalize(MediaPackage src, WorkflowInstance workflowInstance) throws SoxException,
150           IOException, NotFoundException, MediaPackageException, WorkflowOperationException, EncoderException {
151     MediaPackage mediaPackage = (MediaPackage) src.clone();
152 
153     WorkflowOperationInstance operation = workflowInstance.getCurrentOperation();
154 
155     // Check which tags have been configured
156     ConfiguredTagsAndFlavors tagsAndFlavors = getTagsAndFlavors(workflowInstance, Configuration.many,
157         Configuration.many, Configuration.many, Configuration.many);
158     List<String> targetTagsOption = tagsAndFlavors.getTargetTags();
159     List<String> sourceTagsOption = tagsAndFlavors.getSrcTags();
160     List<MediaPackageElementFlavor> sourceFlavorsOption = tagsAndFlavors.getSrcFlavors();
161     List<MediaPackageElementFlavor> targetFlavorOption = tagsAndFlavors.getTargetFlavors();
162 
163     String targetDecibelString = StringUtils.trimToNull(operation.getConfiguration("target-decibel"));
164     if (targetDecibelString == null) {
165       throw new IllegalArgumentException("target-decibel must be specified");
166     }
167     boolean forceTranscode = BooleanUtils.toBoolean(operation.getConfiguration("force-transcode"));
168     Float targetDecibel;
169     try {
170       targetDecibel = new Float(targetDecibelString);
171     } catch (NumberFormatException e1) {
172       throw new WorkflowOperationException("Unable to parse target-decibel " + targetDecibelString);
173     }
174 
175     AbstractMediaPackageElementSelector<Track> elementSelector = new TrackSelector();
176 
177     // Make sure either one of tags or flavors are provided
178     if (sourceTagsOption.isEmpty() && sourceFlavorsOption.isEmpty()) {
179       logger.info("No source tags or flavors have been specified, not matching anything");
180       return createResult(mediaPackage, Action.CONTINUE);
181     }
182 
183     // Select the source flavors
184     for (MediaPackageElementFlavor flavor : sourceFlavorsOption) {
185       elementSelector.addFlavor(flavor);
186     }
187 
188     // Select the source tags
189     for (String tag : sourceTagsOption) {
190       elementSelector.addTag(tag);
191     }
192 
193     //select target flavor
194     MediaPackageElementFlavor targetFlavor = null;
195     if (!targetFlavorOption.isEmpty()) {
196       targetFlavor = targetFlavorOption.get(0);
197     }
198 
199     // Look for elements matching the tag
200     Collection<Track> elements = elementSelector.select(mediaPackage, false);
201 
202     // Encode all tracks found
203     long totalTimeInQueue = 0;
204     List<URI> cleanupURIs = new ArrayList<URI>();
205     Map<Job, Track> normalizeJobs = new HashMap<Job, Track>();
206     try {
207       for (Track track : elements) {
208 
209         TrackImpl audioTrack = (TrackImpl) track;
210         // Skip video only mismatches
211         if (!track.hasAudio()) {
212           logger.info("Skipping audio normalization of '{}', since it contains no audio stream", track);
213           continue;
214         } else if (track.hasVideo() || forceTranscode) {
215           audioTrack = (TrackImpl) extractAudioTrack(track);
216           audioTrack.setAudio(((TrackImpl) track).getAudio());
217           cleanupURIs.add(audioTrack.getURI());
218         }
219 
220         // Analyze audio track
221         if (audioTrack.getAudio().size() < 1 || audioTrack.getAudio().get(0).getRmsLevDb() == null) {
222           logger.info("Audio track {} has no RMS Lev dB metadata, analyze it first", audioTrack);
223           Job analyzeJob = soxService.analyze(audioTrack);
224           if (!waitForStatus(analyzeJob).isSuccess()) {
225             throw new WorkflowOperationException("Unable to analyze the audio track " + audioTrack);
226           }
227           audioTrack = (TrackImpl) MediaPackageElementParser.getFromXml(analyzeJob.getPayload());
228           cleanupURIs.add(audioTrack.getURI());
229         }
230 
231         normalizeJobs.put(soxService.normalize(audioTrack, targetDecibel), track);
232       }
233 
234       if (normalizeJobs.isEmpty()) {
235         logger.info("No matching tracks found");
236         return createResult(mediaPackage, Action.CONTINUE);
237       }
238 
239       // Wait for the jobs to return
240       if (!waitForStatus(normalizeJobs.keySet().toArray(new Job[normalizeJobs.size()])).isSuccess()) {
241         throw new WorkflowOperationException("One of the normalize jobs did not complete successfully");
242       }
243 
244       // Process the result
245       for (Map.Entry<Job, Track> entry : normalizeJobs.entrySet()) {
246         Job job = entry.getKey();
247         TrackImpl origTrack = (TrackImpl) entry.getValue();
248 
249         // add this receipt's queue time to the total
250         totalTimeInQueue += job.getQueueTime();
251 
252         if (job.getPayload().length() > 0) {
253           TrackImpl normalizedAudioTrack = (TrackImpl) MediaPackageElementParser.getFromXml(job.getPayload());
254 
255           TrackImpl resultTrack = normalizedAudioTrack;
256           if (origTrack.hasVideo() || forceTranscode) {
257             cleanupURIs.add(normalizedAudioTrack.getURI());
258 
259             logger.info("Mux normalized audio track {} to video track {}", normalizedAudioTrack, origTrack);
260             Job muxAudioVideo = composerService.mux(origTrack, normalizedAudioTrack, SOX_AREPLACE_PROFILE);
261             if (!waitForStatus(muxAudioVideo).isSuccess()) {
262               throw new WorkflowOperationException("Muxing normalized audio track " + normalizedAudioTrack
263                       + " to video container " + origTrack + " failed");
264             }
265 
266             resultTrack = (TrackImpl) MediaPackageElementParser.getFromXml(muxAudioVideo.getPayload());
267 
268             // Set metadata on track
269             extendAudioStream(resultTrack, normalizedAudioTrack);
270           }
271 
272           adjustFlavorAndTags(targetTagsOption, targetFlavor, origTrack, resultTrack);
273 
274           mediaPackage.addDerived(resultTrack, origTrack);
275           String fileName = getFileNameFromElements(origTrack, resultTrack);
276           resultTrack.setURI(workspace.moveTo(resultTrack.getURI(), mediaPackage.getIdentifier().toString(),
277                   resultTrack.getIdentifier(), fileName));
278         } else {
279           logger.warn("Normalize audio job {} for track {} has no result!", job, origTrack);
280         }
281       }
282     } finally {
283       // Clean up temporary audio and video files from workspace
284       for (URI uri : cleanupURIs) {
285         workspace.delete(uri);
286       }
287     }
288 
289     WorkflowOperationResult result = createResult(mediaPackage, Action.CONTINUE, totalTimeInQueue);
290     logger.debug("Normalize audio operation completed");
291     return result;
292   }
293 
294   private void extendAudioStream(TrackImpl trackToExtend, TrackImpl audioTrackSource) {
295     AudioStreamImpl extendStream = (AudioStreamImpl) trackToExtend.getAudio().get(0);
296     AudioStream sourceStream = audioTrackSource.getAudio().get(0);
297     extendStream.setPkLevDb(sourceStream.getPkLevDb());
298     extendStream.setRmsLevDb(sourceStream.getRmsLevDb());
299     extendStream.setRmsPkDb(sourceStream.getRmsPkDb());
300   }
301 
302   private void adjustFlavorAndTags(List<String> targetTags, MediaPackageElementFlavor targetFlavor, Track origTrack,
303           Track normalized) {
304     // Adjust the target tags
305     for (String tag : targetTags) {
306       logger.trace("Tagging normalized track with '{}'", tag);
307       normalized.addTag(tag);
308     }
309 
310     // Adjust the target flavor. Make sure to account for partial updates
311     if (targetFlavor != null) {
312       String flavorType = targetFlavor.getType();
313       String flavorSubtype = targetFlavor.getSubtype();
314       if ("*".equals(flavorType)) {
315         flavorType = origTrack.getFlavor().getType();
316       }
317       if ("*".equals(flavorSubtype)) {
318         flavorSubtype = origTrack.getFlavor().getSubtype();
319       }
320       normalized.setFlavor(new MediaPackageElementFlavor(flavorType, flavorSubtype));
321       logger.debug("Normalized track has flavor '{}'", normalized.getFlavor());
322     }
323   }
324 
325   /**
326    * Extract the audio track from the given video track.
327    *
328    * @param videoTrack
329    *          the track containing the audio
330    * @return the extracted audio track
331    * @throws WorkflowOperationException
332    * @throws NotFoundException
333    * @throws EncoderException
334    * @throws MediaPackageException
335    */
336   private Track extractAudioTrack(Track videoTrack) throws WorkflowOperationException, EncoderException,
337           MediaPackageException {
338     logger.info("Extract audio stream from track {}", videoTrack);
339     Job job = composerService.encode(videoTrack, SOX_AONLY_PROFILE);
340     if (!waitForStatus(job).isSuccess()) {
341       throw new WorkflowOperationException("Extracting audio track from video track " + videoTrack + " failed");
342     }
343 
344     return (Track) MediaPackageElementParser.getFromXml(job.getPayload());
345   }
346 
347 }