1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.videogrid.impl;
23
24 import org.opencastproject.job.api.AbstractJobProducer;
25 import org.opencastproject.job.api.Job;
26 import org.opencastproject.mediapackage.MediaPackageElementParser;
27 import org.opencastproject.mediapackage.MediaPackageException;
28 import org.opencastproject.mediapackage.Track;
29 import org.opencastproject.security.api.OrganizationDirectoryService;
30 import org.opencastproject.security.api.SecurityService;
31 import org.opencastproject.security.api.UserDirectoryService;
32 import org.opencastproject.serviceregistry.api.ServiceRegistry;
33 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
34 import org.opencastproject.util.ConfigurationException;
35 import org.opencastproject.util.IoSupport;
36 import org.opencastproject.util.LoadUtil;
37 import org.opencastproject.util.NotFoundException;
38 import org.opencastproject.videogrid.api.VideoGridService;
39 import org.opencastproject.videogrid.api.VideoGridServiceException;
40 import org.opencastproject.workspace.api.Workspace;
41
42 import com.google.gson.Gson;
43 import com.google.gson.reflect.TypeToken;
44
45 import org.apache.commons.io.FileUtils;
46 import org.apache.commons.io.FilenameUtils;
47 import org.apache.commons.lang3.StringUtils;
48 import org.osgi.service.cm.ManagedService;
49 import org.osgi.service.component.ComponentContext;
50 import org.osgi.service.component.annotations.Activate;
51 import org.osgi.service.component.annotations.Component;
52 import org.osgi.service.component.annotations.Reference;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import java.io.BufferedReader;
57 import java.io.File;
58 import java.io.FileInputStream;
59 import java.io.FileNotFoundException;
60 import java.io.IOException;
61 import java.io.InputStreamReader;
62 import java.lang.reflect.Type;
63 import java.net.URI;
64 import java.util.ArrayList;
65 import java.util.Arrays;
66 import java.util.Dictionary;
67 import java.util.List;
68
69
70 @Component(
71 immediate = true,
72 service = { VideoGridService.class,ManagedService.class },
73 property = {
74 "service.description=Video Grid Service",
75 "service.pid=org.opencastproject.videogrid.impl.VideoGridServiceImpl"
76 }
77 )
78 public class VideoGridServiceImpl extends AbstractJobProducer implements VideoGridService, ManagedService {
79
80
81 private static final String JOB_LOAD_CONFIG = "job.load.videogrid";
82
83
84 private static final float JOB_LOAD_DEFAULT = 1.5f;
85
86
87 private float jobLoad = JOB_LOAD_DEFAULT;
88
89 private static final Logger logger = LoggerFactory.getLogger(VideoGridServiceImpl.class);
90
91
92 private static final String OPERATION = "createPartialTrack";
93
94
95 private Workspace workspace;
96 private ServiceRegistry serviceRegistry;
97 private SecurityService securityService;
98 private UserDirectoryService userDirectoryService;
99 private OrganizationDirectoryService organizationDirectoryService;
100
101
102 private static final Gson gson = new Gson();
103 private static final Type stringListOfListType = new TypeToken<List<List<String>>>() { }.getType();
104
105
106 public VideoGridServiceImpl() {
107 super(JOB_TYPE);
108 }
109
110 @Override
111 @Activate
112 public void activate(ComponentContext cc) {
113 super.activate(cc);
114 logger.debug("Activated videogrid service");
115 }
116
117 @Override
118 public void updated(Dictionary properties) throws ConfigurationException {
119 if (properties == null) {
120 return;
121 }
122 logger.debug("Start updating videogrid service");
123
124 jobLoad = LoadUtil.getConfiguredLoadValue(properties, JOB_LOAD_CONFIG, JOB_LOAD_DEFAULT, serviceRegistry);
125 logger.debug("Set videogrid job load to {}", jobLoad);
126
127 logger.debug("Finished updating videogrid service");
128 }
129
130
131
132
133
134
135 @Override
136 protected String process(Job job) throws Exception {
137 logger.debug("Started processing job {}", job.getId());
138 if (!OPERATION.equals(job.getOperation())) {
139 throw new ServiceRegistryException(String.format("This service can't handle operations of type '%s'",
140 job.getOperation()));
141 }
142
143
144 List<String> arguments = job.getArguments();
145 List<String> command = gson.fromJson(arguments.get(0), new TypeToken<List<String>>() { }.getType());
146 List<Track> tracks = new ArrayList<>();
147 for (int i = 1; i < arguments.size(); i++) {
148 tracks.add(i - 1, (Track) MediaPackageElementParser.getFromXml(arguments.get(i)));
149 }
150
151 String outputDirPath = String.format("%s/videogrid/%d/", workspace.rootDirectory(), job.getId());
152 FileUtils.forceMkdir(new File(outputDirPath));
153
154
155 for (int i = 0; i < command.size(); i++) {
156 String[] trackIds = StringUtils.substringsBetween(command.get(i), "#{","}");
157 if (trackIds != null) {
158 for (String trackId: trackIds) {
159 Track replaceTrack = tracks.stream()
160 .filter(track -> track.getIdentifier().equals(trackId))
161 .findAny()
162 .orElse(null);
163 if (replaceTrack == null) {
164 throw new VideoGridServiceException(String.format("Track with id %s could not be found!", trackId));
165 }
166 command.set(i, command.get(i).replaceAll("#\\{" + trackId + "\\}", getTrackPath(replaceTrack)));
167 }
168 }
169 }
170
171
172 String outputFile = outputDirPath + "videogrid_for_Job_" + job.getId() + ".mp4";
173 command.add(outputFile);
174
175 logger.info("Running command: {}", command);
176
177
178 ProcessBuilder pb = new ProcessBuilder(command);
179 pb.redirectErrorStream(true);
180 Process ffmpegProcess = null;
181 int exitCode = 1;
182 BufferedReader errStream = null;
183 try {
184 ffmpegProcess = pb.start();
185
186 errStream = new BufferedReader(new InputStreamReader(ffmpegProcess.getInputStream()));
187 String line = errStream.readLine();
188 while (line != null) {
189 logger.info(line);
190 line = errStream.readLine();
191 }
192
193 exitCode = ffmpegProcess.waitFor();
194 } catch (IOException ex) {
195 throw new VideoGridServiceException("Start ffmpeg process failed", ex);
196 } catch (InterruptedException ex) {
197 throw new VideoGridServiceException("Waiting for encoder process exited was interrupted unexpectedly", ex);
198 } finally {
199 IoSupport.closeQuietly(ffmpegProcess);
200 IoSupport.closeQuietly(errStream);
201 if (exitCode != 0) {
202 try {
203 logger.warn("FFMPEG process exited with errorcode: " + exitCode);
204 FileUtils.forceDelete(new File(outputDirPath));
205 } catch (IOException e) {
206
207 }
208 }
209 }
210
211 if (exitCode != 0) {
212 throw new Exception(String.format("The encoder process exited abnormally with exit code %s "
213 + "using command\n%s", exitCode, String.join(" ", command)));
214 }
215
216
217 FileInputStream outputFileInputStream = null;
218 URI videoFileUri;
219 try {
220 outputFileInputStream = new FileInputStream(outputFile);
221 videoFileUri = workspace.putInCollection("videogrid",
222 FilenameUtils.getName(outputFile), outputFileInputStream);
223 logger.info("Copied the created video to the workspace {}", videoFileUri);
224 } catch (FileNotFoundException ex) {
225 throw new VideoGridServiceException(String.format("Video file '%s' not found", outputFile), ex);
226 } catch (IOException ex) {
227 throw new VideoGridServiceException(String.format(
228 "Can't write video file '%s' to workspace", outputFile), ex);
229 } catch (IllegalArgumentException ex) {
230 throw new VideoGridServiceException(ex);
231 } finally {
232 IoSupport.closeQuietly(outputFileInputStream);
233 }
234
235 FileUtils.deleteQuietly(new File(workspace.rootDirectory(), String.format("videogrid/%d", job.getId())));
236
237
238 return gson.toJson(videoFileUri);
239 }
240
241 @Override
242 public Job createPartialTrack(List<String> command, Track... tracks)
243 throws VideoGridServiceException, MediaPackageException {
244 List<String> jobArguments = new ArrayList<>(Arrays.asList(gson.toJson(command)));
245 for (int i = 0; i < tracks.length; i++) {
246 jobArguments.add(i + 1, MediaPackageElementParser.getAsXml(tracks[i]));
247 }
248 try {
249 logger.debug("Create videogrid service job");
250 return serviceRegistry.createJob(JOB_TYPE, OPERATION, jobArguments, jobLoad);
251 } catch (ServiceRegistryException e) {
252 throw new VideoGridServiceException(e);
253 }
254 }
255
256
257
258
259
260
261
262
263
264 private String getTrackPath(Track track) throws VideoGridServiceException {
265 File mediaFile;
266 try {
267 mediaFile = workspace.get(track.getURI());
268 } catch (NotFoundException e) {
269 throw new VideoGridServiceException(
270 "Error finding the media file in the workspace", e);
271 } catch (IOException e) {
272 throw new VideoGridServiceException(
273 "Error reading the media file in the workspace", e);
274 }
275 return mediaFile.getAbsolutePath();
276 }
277
278 @Override
279 protected ServiceRegistry getServiceRegistry() {
280 return serviceRegistry;
281 }
282
283 @Override
284 protected SecurityService getSecurityService() {
285 return securityService;
286 }
287
288 @Override
289 protected UserDirectoryService getUserDirectoryService() {
290 return userDirectoryService;
291 }
292
293 @Override
294 protected OrganizationDirectoryService getOrganizationDirectoryService() {
295 return organizationDirectoryService;
296 }
297
298 @Reference
299 public void setWorkspace(Workspace workspace) {
300 this.workspace = workspace;
301 }
302
303 @Reference
304 public void setServiceRegistry(ServiceRegistry jobManager) {
305 this.serviceRegistry = jobManager;
306 }
307
308 @Reference
309 public void setSecurityService(SecurityService securityService) {
310 this.securityService = securityService;
311 }
312
313 @Reference
314 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
315 this.userDirectoryService = userDirectoryService;
316 }
317
318 @Reference
319 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
320 this.organizationDirectoryService = organizationDirectoryService;
321 }
322 }