View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.videogrid.impl;
23  
24  import org.opencastproject.job.api.AbstractJobProducer;
25  import org.opencastproject.job.api.Job;
26  import org.opencastproject.mediapackage.MediaPackageElementParser;
27  import org.opencastproject.mediapackage.MediaPackageException;
28  import org.opencastproject.mediapackage.Track;
29  import org.opencastproject.security.api.OrganizationDirectoryService;
30  import org.opencastproject.security.api.SecurityService;
31  import org.opencastproject.security.api.UserDirectoryService;
32  import org.opencastproject.serviceregistry.api.ServiceRegistry;
33  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
34  import org.opencastproject.util.ConfigurationException;
35  import org.opencastproject.util.IoSupport;
36  import org.opencastproject.util.LoadUtil;
37  import org.opencastproject.util.NotFoundException;
38  import org.opencastproject.videogrid.api.VideoGridService;
39  import org.opencastproject.videogrid.api.VideoGridServiceException;
40  import org.opencastproject.workspace.api.Workspace;
41  
42  import com.google.gson.Gson;
43  import com.google.gson.reflect.TypeToken;
44  
45  import org.apache.commons.io.FileUtils;
46  import org.apache.commons.io.FilenameUtils;
47  import org.apache.commons.lang3.StringUtils;
48  import org.osgi.service.cm.ManagedService;
49  import org.osgi.service.component.ComponentContext;
50  import org.osgi.service.component.annotations.Activate;
51  import org.osgi.service.component.annotations.Component;
52  import org.osgi.service.component.annotations.Reference;
53  import org.slf4j.Logger;
54  import org.slf4j.LoggerFactory;
55  
56  import java.io.BufferedReader;
57  import java.io.File;
58  import java.io.FileInputStream;
59  import java.io.FileNotFoundException;
60  import java.io.IOException;
61  import java.io.InputStreamReader;
62  import java.lang.reflect.Type;
63  import java.net.URI;
64  import java.util.ArrayList;
65  import java.util.Arrays;
66  import java.util.Dictionary;
67  import java.util.List;
68  
69  /** Create video grids */
70  @Component(
71      immediate = true,
72      service = { VideoGridService.class,ManagedService.class },
73      property = {
74          "service.description=Video Grid Service",
75          "service.pid=org.opencastproject.videogrid.impl.VideoGridServiceImpl"
76      }
77  )
78  public class VideoGridServiceImpl extends AbstractJobProducer implements VideoGridService, ManagedService {
79  
80    /** Configuration key for this operation's job load */
81    private static final String JOB_LOAD_CONFIG = "job.load.videogrid";
82  
83    /** The load introduced on the system by creating a job */
84    private static final float JOB_LOAD_DEFAULT = 1.5f;
85  
86    /** The load introduced on the system by creating a job */
87    private float jobLoad = JOB_LOAD_DEFAULT;
88  
89    private static final Logger logger = LoggerFactory.getLogger(VideoGridServiceImpl.class);
90  
91    /** List of available operations on jobs */
92    private static final String OPERATION = "createPartialTrack";
93  
94    /** Services */
95    private Workspace workspace;
96    private ServiceRegistry serviceRegistry;
97    private SecurityService securityService;
98    private UserDirectoryService userDirectoryService;
99    private OrganizationDirectoryService organizationDirectoryService;
100 
101   /** For JSON serialization */
102   private static final Gson gson = new Gson();
103   private static final Type stringListOfListType = new TypeToken<List<List<String>>>() { }.getType();
104 
105   /** Creates a new videogrid service instance. */
106   public VideoGridServiceImpl() {
107     super(JOB_TYPE);
108   }
109 
110   @Override
111   @Activate
112   public void activate(ComponentContext cc) {
113     super.activate(cc);
114     logger.debug("Activated videogrid service");
115   }
116 
117   @Override
118   public void updated(Dictionary properties) throws ConfigurationException {
119     if (properties == null) {
120       return;
121     }
122     logger.debug("Start updating videogrid service");
123 
124     jobLoad = LoadUtil.getConfiguredLoadValue(properties, JOB_LOAD_CONFIG, JOB_LOAD_DEFAULT, serviceRegistry);
125     logger.debug("Set videogrid job load to {}", jobLoad);
126 
127     logger.debug("Finished updating videogrid service");
128   }
129 
130   /**
131    * {@inheritDoc}
132    *
133    * @see org.opencastproject.job.api.AbstractJobProducer#process(org.opencastproject.job.api.Job)
134    */
135   @Override
136   protected String process(Job job) throws Exception {
137     logger.debug("Started processing job {}", job.getId());
138     if (!OPERATION.equals(job.getOperation())) {
139       throw new ServiceRegistryException(String.format("This service can't handle operations of type '%s'",
140               job.getOperation()));
141     }
142 
143     // Parse arguments
144     List<String> arguments = job.getArguments();
145     List<String> command = gson.fromJson(arguments.get(0), new TypeToken<List<String>>() { }.getType());
146     List<Track> tracks = new ArrayList<>();
147     for (int i = 1; i < arguments.size(); i++) {
148       tracks.add(i - 1, (Track) MediaPackageElementParser.getFromXml(arguments.get(i)));
149     }
150 
151     String outputDirPath = String.format("%s/videogrid/%d/", workspace.rootDirectory(), job.getId());
152     FileUtils.forceMkdir(new File(outputDirPath));
153 
154     // Replace placeholders in command with track paths
155     for (int i = 0; i < command.size(); i++) {
156       String[] trackIds = StringUtils.substringsBetween(command.get(i), "#{","}");
157       if (trackIds != null) {
158         for (String trackId: trackIds) {
159           Track replaceTrack = tracks.stream()
160                   .filter(track -> track.getIdentifier().equals(trackId))
161                   .findAny()
162                   .orElse(null);
163           if (replaceTrack == null) {
164             throw new VideoGridServiceException(String.format("Track with id %s could not be found!", trackId));
165           }
166           command.set(i, command.get(i).replaceAll("#\\{" + trackId + "\\}", getTrackPath(replaceTrack)));
167         }
168       }
169     }
170 
171     // Add output path to command
172     String outputFile = outputDirPath + "videogrid_for_Job_" + job.getId() + ".mp4";
173     command.add(outputFile);
174 
175     logger.info("Running command: {}", command);
176 
177     // Run ffmpeg
178     ProcessBuilder pb = new ProcessBuilder(command);
179     pb.redirectErrorStream(true);
180     Process ffmpegProcess = null;
181     int exitCode = 1;
182     BufferedReader errStream = null;
183     try {
184       ffmpegProcess = pb.start();
185 
186       errStream = new BufferedReader(new InputStreamReader(ffmpegProcess.getInputStream()));
187       String line = errStream.readLine();
188       while (line != null) {
189         logger.info(line);
190         line = errStream.readLine();
191       }
192 
193       exitCode = ffmpegProcess.waitFor();
194     } catch (IOException ex) {
195       throw new VideoGridServiceException("Start ffmpeg process failed", ex);
196     } catch (InterruptedException ex) {
197       throw new VideoGridServiceException("Waiting for encoder process exited was interrupted unexpectedly", ex);
198     } finally {
199       IoSupport.closeQuietly(ffmpegProcess);
200       IoSupport.closeQuietly(errStream);
201       if (exitCode != 0) {
202         try {
203           logger.warn("FFMPEG process exited with errorcode: " + exitCode);
204           FileUtils.forceDelete(new File(outputDirPath));
205         } catch (IOException e) {
206           // it is ok, no output file was generated by ffmpeg
207         }
208       }
209     }
210 
211     if (exitCode != 0) {
212       throw new Exception(String.format("The encoder process exited abnormally with exit code %s "
213               + "using command\n%s", exitCode, String.join(" ", command)));
214     }
215 
216     // Put generated video into workspace
217     FileInputStream outputFileInputStream = null;
218     URI videoFileUri;
219     try {
220       outputFileInputStream = new FileInputStream(outputFile);
221       videoFileUri = workspace.putInCollection("videogrid",
222               FilenameUtils.getName(outputFile), outputFileInputStream);
223       logger.info("Copied the created video to the workspace {}", videoFileUri);
224     } catch (FileNotFoundException ex) {
225       throw new VideoGridServiceException(String.format("Video file '%s' not found", outputFile), ex);
226     } catch (IOException ex) {
227       throw new VideoGridServiceException(String.format(
228               "Can't write video file '%s' to workspace", outputFile), ex);
229     } catch (IllegalArgumentException ex) {
230       throw new VideoGridServiceException(ex);
231     } finally {
232       IoSupport.closeQuietly(outputFileInputStream);
233     }
234 
235     FileUtils.deleteQuietly(new File(workspace.rootDirectory(), String.format("videogrid/%d", job.getId())));
236 
237     // Return URIs to the videos;
238     return gson.toJson(videoFileUri);
239   }
240 
241   @Override
242   public Job createPartialTrack(List<String> command, Track... tracks)
243           throws VideoGridServiceException, MediaPackageException {
244     List<String> jobArguments = new ArrayList<>(Arrays.asList(gson.toJson(command)));
245     for (int i = 0; i < tracks.length; i++) {
246       jobArguments.add(i + 1, MediaPackageElementParser.getAsXml(tracks[i]));
247     }
248     try {
249       logger.debug("Create videogrid service job");
250       return serviceRegistry.createJob(JOB_TYPE, OPERATION, jobArguments, jobLoad);
251     } catch (ServiceRegistryException e) {
252       throw new VideoGridServiceException(e);
253     }
254   }
255 
256   /**
257    * Returns the absolute path of the track
258    *
259    * @param track
260    *          Track whose path you want
261    * @return {@String} containing the absolute path of the given track
262    * @throws VideoGridServiceException
263    */
264   private String getTrackPath(Track track) throws VideoGridServiceException {
265     File mediaFile;
266     try {
267       mediaFile = workspace.get(track.getURI());
268     } catch (NotFoundException e) {
269       throw new VideoGridServiceException(
270               "Error finding the media file in the workspace", e);
271     } catch (IOException e) {
272       throw new VideoGridServiceException(
273               "Error reading the media file in the workspace", e);
274     }
275     return mediaFile.getAbsolutePath();
276   }
277 
278   @Override
279   protected ServiceRegistry getServiceRegistry() {
280     return serviceRegistry;
281   }
282 
283   @Override
284   protected SecurityService getSecurityService() {
285     return securityService;
286   }
287 
288   @Override
289   protected UserDirectoryService getUserDirectoryService() {
290     return userDirectoryService;
291   }
292 
293   @Override
294   protected OrganizationDirectoryService getOrganizationDirectoryService() {
295     return organizationDirectoryService;
296   }
297 
298   @Reference
299   public void setWorkspace(Workspace workspace) {
300     this.workspace = workspace;
301   }
302 
303   @Reference
304   public void setServiceRegistry(ServiceRegistry jobManager) {
305     this.serviceRegistry = jobManager;
306   }
307 
308   @Reference
309   public void setSecurityService(SecurityService securityService) {
310     this.securityService = securityService;
311   }
312 
313   @Reference
314   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
315     this.userDirectoryService = userDirectoryService;
316   }
317 
318   @Reference
319   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
320     this.organizationDirectoryService = organizationDirectoryService;
321   }
322 }