View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.sox.impl;
23  
24  import static org.opencastproject.util.data.Option.some;
25  
26  import org.opencastproject.job.api.AbstractJobProducer;
27  import org.opencastproject.job.api.Job;
28  import org.opencastproject.mediapackage.AudioStream;
29  import org.opencastproject.mediapackage.MediaPackageElementParser;
30  import org.opencastproject.mediapackage.MediaPackageException;
31  import org.opencastproject.mediapackage.Track;
32  import org.opencastproject.mediapackage.identifier.IdImpl;
33  import org.opencastproject.mediapackage.track.AudioStreamImpl;
34  import org.opencastproject.mediapackage.track.TrackImpl;
35  import org.opencastproject.security.api.OrganizationDirectoryService;
36  import org.opencastproject.security.api.SecurityService;
37  import org.opencastproject.security.api.UserDirectoryService;
38  import org.opencastproject.serviceregistry.api.ServiceRegistry;
39  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
40  import org.opencastproject.sox.api.SoxException;
41  import org.opencastproject.sox.api.SoxService;
42  import org.opencastproject.util.FileSupport;
43  import org.opencastproject.util.IoSupport;
44  import org.opencastproject.util.LoadUtil;
45  import org.opencastproject.util.NotFoundException;
46  import org.opencastproject.util.data.Option;
47  import org.opencastproject.workspace.api.Workspace;
48  
49  import org.apache.commons.io.FilenameUtils;
50  import org.apache.commons.io.IOUtils;
51  import org.apache.commons.lang3.StringUtils;
52  import org.osgi.service.cm.ConfigurationException;
53  import org.osgi.service.cm.ManagedService;
54  import org.osgi.service.component.ComponentContext;
55  import org.osgi.service.component.annotations.Activate;
56  import org.osgi.service.component.annotations.Component;
57  import org.osgi.service.component.annotations.Reference;
58  import org.slf4j.Logger;
59  import org.slf4j.LoggerFactory;
60  
61  import java.io.BufferedReader;
62  import java.io.File;
63  import java.io.FileInputStream;
64  import java.io.IOException;
65  import java.io.InputStream;
66  import java.io.InputStreamReader;
67  import java.net.URI;
68  import java.util.ArrayList;
69  import java.util.Arrays;
70  import java.util.Dictionary;
71  import java.util.List;
72  import java.util.UUID;
73  
74  @Component(
75      immediate = true,
76      service = { SoxService.class,ManagedService.class },
77      property = {
78          "service.description=Sox audio processing service"
79      }
80  )
81  public class SoxServiceImpl extends AbstractJobProducer implements SoxService, ManagedService {
82  
83    /** The logging instance */
84    private static final Logger logger = LoggerFactory.getLogger(SoxServiceImpl.class);
85  
86    /** Default location of the SoX binary (resembling the installer) */
87    public static final String SOX_BINARY_DEFAULT = "sox";
88  
89    public static final String CONFIG_SOX_PATH = "org.opencastproject.sox.path";
90  
91    /** The load introduced on the system by creating a analyze job */
92    public static final float DEFAULT_ANALYZE_JOB_LOAD = 0.2f;
93  
94    /** The key to look for in the service configuration file to override the {@link DEFAULT_ANALYZE_JOB_LOAD} */
95    public static final String ANALYZE_JOB_LOAD_KEY = "job.load.analyze";
96  
97    /** The load introduced on the system by creating a analyze job */
98    private float analyzeJobLoad = DEFAULT_ANALYZE_JOB_LOAD;
99  
100   /** The load introduced on the system by creating a normalize job */
101   public static final float DEFAULT_NORMALIZE_JOB_LOAD = 0.2f;
102 
103   /** The key to look for in the service configuration file to override the {@link DEFAULT_NORMALIZE_JOB_LOAD} */
104   public static final String NORMALIZE_JOB_LOAD_KEY = "job.load.normalize";
105 
106   /** The load introduced on the system by creating a normalize job */
107   private float normalizeJobLoad = DEFAULT_NORMALIZE_JOB_LOAD;
108 
109   /** List of available operations on jobs */
110   private enum Operation {
111     Analyze, Normalize
112   }
113 
114   /** The collection name */
115   public static final String COLLECTION = "sox";
116 
117   /** Reference to the workspace service */
118   private Workspace workspace = null;
119 
120   /** Reference to the receipt service */
121   private ServiceRegistry serviceRegistry;
122 
123   /** The security service */
124   protected SecurityService securityService = null;
125 
126   /** The user directory service */
127   protected UserDirectoryService userDirectoryService = null;
128 
129   /** The organization directory service */
130   protected OrganizationDirectoryService organizationDirectoryService = null;
131 
132   private String binary = SOX_BINARY_DEFAULT;
133 
134   /** Creates a new composer service instance. */
135   public SoxServiceImpl() {
136     super(JOB_TYPE);
137   }
138 
139   /**
140    * OSGi callback on component activation.
141    *
142    * @param cc
143    *          the component context
144    */
145   @Override
146   @Activate
147   public void activate(ComponentContext cc) {
148     logger.info("Activating sox service");
149     super.activate(cc);
150     // Configure sox
151     String path = (String) cc.getBundleContext().getProperty(CONFIG_SOX_PATH);
152     if (path == null) {
153       logger.debug("DEFAULT " + CONFIG_SOX_PATH + ": " + SOX_BINARY_DEFAULT);
154     } else {
155       binary = path;
156       logger.debug("SoX config binary: {}", path);
157     }
158   }
159 
160   /**
161    * {@inheritDoc}
162    *
163    * @see org.opencastproject.sox.api.SoxService#analyze(Track)
164    */
165   @Override
166   public Job analyze(Track sourceAudioTrack) throws MediaPackageException, SoxException {
167     try {
168       return serviceRegistry.createJob(JOB_TYPE, Operation.Analyze.toString(),
169               Arrays.asList(MediaPackageElementParser.getAsXml(sourceAudioTrack)), analyzeJobLoad);
170     } catch (ServiceRegistryException e) {
171       throw new SoxException("Unable to create a job", e);
172     }
173   }
174 
175   /**
176    * {@inheritDoc}
177    *
178    * @see org.opencastproject.sox.api.SoxService#normalize(Track, Float)
179    */
180   @Override
181   public Job normalize(Track sourceAudioTrack, Float targetRmsLevDb) throws MediaPackageException, SoxException {
182     try {
183       return serviceRegistry.createJob(JOB_TYPE, Operation.Normalize.toString(),
184               Arrays.asList(MediaPackageElementParser.getAsXml(sourceAudioTrack), targetRmsLevDb.toString()),
185               normalizeJobLoad);
186     } catch (ServiceRegistryException e) {
187       throw new SoxException("Unable to create a job", e);
188     }
189   }
190 
191   /**
192    * {@inheritDoc}
193    *
194    * @see org.opencastproject.job.api.AbstractJobProducer#process(org.opencastproject.job.api.Job)
195    */
196   @Override
197   protected String process(Job job) throws Exception {
198     Operation op = null;
199     String operation = job.getOperation();
200     List<String> arguments = job.getArguments();
201     try {
202       op = Operation.valueOf(operation);
203       TrackImpl audioTrack = null;
204 
205       final String serialized;
206       switch (op) {
207         case Analyze:
208           audioTrack = (TrackImpl) MediaPackageElementParser.getFromXml(arguments.get(0));
209           serialized = analyze(job, audioTrack).map(MediaPackageElementParser.<Track> getAsXml()).getOrElse("");
210           break;
211         case Normalize:
212           audioTrack = (TrackImpl) MediaPackageElementParser.getFromXml(arguments.get(0));
213           Float targetRmsLevDb = new Float(arguments.get(1));
214           serialized = normalize(job, audioTrack, targetRmsLevDb).map(MediaPackageElementParser.<Track> getAsXml())
215                   .getOrElse("");
216           break;
217         default:
218           throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
219       }
220 
221       return serialized;
222     } catch (IllegalArgumentException e) {
223       throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
224     } catch (Exception e) {
225       throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
226     }
227   }
228 
229   protected Option<Track> analyze(Job job, Track audioTrack) throws SoxException {
230     if (!audioTrack.hasAudio()) {
231       throw new SoxException("No audio stream available");
232     }
233     if (audioTrack.hasVideo()) {
234       throw new SoxException("It must not have a video stream");
235     }
236 
237     try {
238       // Get the tracks and make sure they exist
239       final File audioFile;
240       try {
241         audioFile = workspace.get(audioTrack.getURI());
242       } catch (NotFoundException e) {
243         throw new SoxException("Requested audio track " + audioTrack + " is not found");
244       } catch (IOException e) {
245         throw new SoxException("Unable to access audio track " + audioTrack);
246       }
247 
248       logger.info("Analyzing audio track {}", audioTrack.getIdentifier());
249 
250       // Do the work
251       ArrayList<String> command = new ArrayList<String>();
252       command.add(binary);
253       command.add(audioFile.getAbsolutePath());
254       command.add("-n");
255       command.add("remix");
256       command.add("-");
257       command.add("stats");
258       List<String> analyzeResult = launchSoxProcess(command);
259 
260       // Add audio metadata and return audio track
261       return some(addAudioMetadata(audioTrack, analyzeResult));
262     } catch (Exception e) {
263       logger.warn("Error analyzing {}: {}", audioTrack, e.getMessage());
264       if (e instanceof SoxException) {
265         throw (SoxException) e;
266       } else {
267         throw new SoxException(e);
268       }
269     }
270   }
271 
272   private Track addAudioMetadata(Track audioTrack, List<String> metadata) {
273     TrackImpl track = (TrackImpl) audioTrack;
274     List<AudioStream> audio = track.getAudio();
275 
276     if (audio.size() == 0) {
277       audio.add(new AudioStreamImpl());
278       logger.info("No audio streams found created new audio stream");
279     }
280 
281     AudioStreamImpl audioStream = (AudioStreamImpl) audio.get(0);
282     if (audio.size() > 1) {
283       logger.info("Multiple audio streams found, take first audio stream {}", audioStream);
284     }
285 
286     for (String value : metadata) {
287       if (value.startsWith("Pk lev dB")) {
288         Float pkLevDb = new Float(StringUtils.substringAfter(value, "Pk lev dB").trim());
289         audioStream.setPkLevDb(pkLevDb);
290       } else if (value.startsWith("RMS lev dB")) {
291         Float rmsLevDb = new Float(StringUtils.substringAfter(value, "RMS lev dB").trim());
292         audioStream.setRmsLevDb(rmsLevDb);
293       } else if (value.startsWith("RMS Pk dB")) {
294         Float rmsPkDb = new Float(StringUtils.substringAfter(value, "RMS Pk dB").trim());
295         audioStream.setRmsPkDb(rmsPkDb);
296       }
297     }
298     return track;
299   }
300 
301   private List<String> launchSoxProcess(List<String> command) throws SoxException {
302     Process process = null;
303     BufferedReader in = null;
304     try {
305       logger.info("Start sox process {}", command);
306       ProcessBuilder pb = new ProcessBuilder(command);
307       pb.redirectErrorStream(true); // Unfortunately merges but necessary for deadlock prevention
308       process = pb.start();
309       in = new BufferedReader(new InputStreamReader(process.getInputStream()));
310       process.waitFor();
311       String line = null;
312       List<String> stats = new ArrayList<String>();
313       while ((line = in.readLine()) != null) {
314         logger.info(line);
315         stats.add(line);
316       }
317       if (process.exitValue() != 0) {
318         throw new SoxException("Sox process failed with error code: " + process.exitValue());
319       }
320       logger.info("Sox process finished");
321       return stats;
322     } catch (IOException e) {
323       throw new SoxException("Could not start sox process: " + command + "\n" + e.getMessage());
324     } catch (InterruptedException e) {
325       throw new SoxException("Could not start sox process: " + command + "\n" + e.getMessage());
326     } finally {
327       IoSupport.closeQuietly(in);
328     }
329   }
330 
331   private Option<Track> normalize(Job job, TrackImpl audioTrack, Float targetRmsLevDb) throws SoxException {
332     if (!audioTrack.hasAudio()) {
333       throw new SoxException("No audio stream available");
334     }
335     if (audioTrack.hasVideo()) {
336       throw new SoxException("It must not have a video stream");
337     }
338     if (audioTrack.getAudio().size() < 1) {
339       throw new SoxException("No audio stream metadata available");
340     }
341     if (audioTrack.getAudio().get(0).getRmsLevDb() == null) {
342       throw new SoxException("No RMS Lev dB metadata available");
343     }
344 
345     final String targetTrackId = IdImpl.fromUUID().toString();
346 
347     Float rmsLevDb = audioTrack.getAudio().get(0).getRmsLevDb();
348 
349     // Get the tracks and make sure they exist
350     final File audioFile;
351     try {
352       audioFile = workspace.get(audioTrack.getURI());
353     } catch (NotFoundException e) {
354       throw new SoxException("Requested audio track " + audioTrack + " is not found");
355     } catch (IOException e) {
356       throw new SoxException("Unable to access audio track " + audioTrack);
357     }
358 
359     String outDir = audioFile.getAbsoluteFile().getParent();
360     String outFileName = FilenameUtils.getBaseName(audioFile.getName()) + "_" + UUID.randomUUID().toString();
361     String suffix = "-norm." + FilenameUtils.getExtension(audioFile.getName());
362 
363     File normalizedFile = new File(outDir, outFileName + suffix);
364 
365     logger.info("Normalizing audio track {} to {}", audioTrack.getIdentifier(), targetTrackId);
366 
367     // Do the work
368     ArrayList<String> command = new ArrayList<String>();
369     command.add(binary);
370     command.add(audioFile.getAbsolutePath());
371     command.add(normalizedFile.getAbsolutePath());
372     command.add("remix");
373     command.add("-");
374     command.add("gain");
375     if (targetRmsLevDb > rmsLevDb) {
376       command.add("-l");
377     }
378     command.add(new Float(targetRmsLevDb - rmsLevDb).toString());
379     command.add("stats");
380 
381     List<String> normalizeResult = launchSoxProcess(command);
382 
383     if (normalizedFile.length() == 0) {
384       throw new SoxException("Normalization failed: Output file is empty!");
385     }
386 
387     // Put the file in the workspace
388     URI returnURL = null;
389     InputStream in = null;
390     try {
391       in = new FileInputStream(normalizedFile);
392       returnURL = workspace.putInCollection(COLLECTION,
393               job.getId() + "." + FilenameUtils.getExtension(normalizedFile.getAbsolutePath()), in);
394       logger.info("Copied the normalized file to the workspace at {}", returnURL);
395       if (normalizedFile.delete()) {
396         logger.info("Deleted the local copy of the normalized file at {}", normalizedFile.getAbsolutePath());
397       } else {
398         logger.warn("Unable to delete the normalized output at {}", normalizedFile);
399       }
400     } catch (Exception e) {
401       throw new SoxException("Unable to put the normalized file into the workspace", e);
402     } finally {
403       IOUtils.closeQuietly(in);
404       FileSupport.deleteQuietly(normalizedFile);
405     }
406 
407     Track normalizedTrack = (Track) audioTrack.clone();
408     normalizedTrack.setURI(returnURL);
409     normalizedTrack.setIdentifier(targetTrackId);
410     // Add audio metadata and return audio track
411     normalizedTrack = addAudioMetadata(normalizedTrack, normalizeResult);
412 
413     return some(normalizedTrack);
414   }
415 
416   /**
417    * Sets the workspace
418    *
419    * @param workspace
420    *          an instance of the workspace
421    */
422   @Reference
423   protected void setWorkspace(Workspace workspace) {
424     this.workspace = workspace;
425   }
426 
427   /**
428    * Sets the service registry
429    *
430    * @param serviceRegistry
431    *          the service registry
432    */
433   @Reference
434   protected void setServiceRegistry(ServiceRegistry serviceRegistry) {
435     this.serviceRegistry = serviceRegistry;
436   }
437 
438   /**
439    * {@inheritDoc}
440    *
441    * @see org.opencastproject.job.api.AbstractJobProducer#getServiceRegistry()
442    */
443   @Override
444   protected ServiceRegistry getServiceRegistry() {
445     return serviceRegistry;
446   }
447 
448   /**
449    * Callback for setting the security service.
450    *
451    * @param securityService
452    *          the securityService to set
453    */
454   @Reference
455   public void setSecurityService(SecurityService securityService) {
456     this.securityService = securityService;
457   }
458 
459   /**
460    * Callback for setting the user directory service.
461    *
462    * @param userDirectoryService
463    *          the userDirectoryService to set
464    */
465   @Reference
466   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
467     this.userDirectoryService = userDirectoryService;
468   }
469 
470   /**
471    * Sets a reference to the organization directory service.
472    *
473    * @param organizationDirectory
474    *          the organization directory
475    */
476   @Reference
477   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
478     this.organizationDirectoryService = organizationDirectory;
479   }
480 
481   /**
482    * {@inheritDoc}
483    *
484    * @see org.opencastproject.job.api.AbstractJobProducer#getSecurityService()
485    */
486   @Override
487   protected SecurityService getSecurityService() {
488     return securityService;
489   }
490 
491   /**
492    * {@inheritDoc}
493    *
494    * @see org.opencastproject.job.api.AbstractJobProducer#getUserDirectoryService()
495    */
496   @Override
497   protected UserDirectoryService getUserDirectoryService() {
498     return userDirectoryService;
499   }
500 
501   /**
502    * {@inheritDoc}
503    *
504    * @see org.opencastproject.job.api.AbstractJobProducer#getOrganizationDirectoryService()
505    */
506   @Override
507   protected OrganizationDirectoryService getOrganizationDirectoryService() {
508     return organizationDirectoryService;
509   }
510 
511   @Override
512   public void updated(Dictionary properties) throws ConfigurationException {
513     analyzeJobLoad = LoadUtil.getConfiguredLoadValue(properties, ANALYZE_JOB_LOAD_KEY, DEFAULT_ANALYZE_JOB_LOAD,
514             serviceRegistry);
515     normalizeJobLoad = LoadUtil.getConfiguredLoadValue(properties, NORMALIZE_JOB_LOAD_KEY, DEFAULT_NORMALIZE_JOB_LOAD,
516             serviceRegistry);
517   }
518 
519 }