1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.execute.operation.handler;
23
24 import org.opencastproject.execute.api.ExecuteException;
25 import org.opencastproject.execute.api.ExecuteService;
26 import org.opencastproject.inspection.api.MediaInspectionException;
27 import org.opencastproject.inspection.api.MediaInspectionService;
28 import org.opencastproject.job.api.Job;
29 import org.opencastproject.job.api.JobContext;
30 import org.opencastproject.mediapackage.MediaPackage;
31 import org.opencastproject.mediapackage.MediaPackageElement;
32 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
33 import org.opencastproject.mediapackage.MediaPackageElementParser;
34 import org.opencastproject.mediapackage.MediaPackageException;
35 import org.opencastproject.mediapackage.Track;
36 import org.opencastproject.mediapackage.selector.SimpleElementSelector;
37 import org.opencastproject.serviceregistry.api.ServiceRegistry;
38 import org.opencastproject.util.NotFoundException;
39 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
40 import org.opencastproject.workflow.api.ConfiguredTagsAndFlavors;
41 import org.opencastproject.workflow.api.WorkflowInstance;
42 import org.opencastproject.workflow.api.WorkflowOperationException;
43 import org.opencastproject.workflow.api.WorkflowOperationHandler;
44 import org.opencastproject.workflow.api.WorkflowOperationInstance;
45 import org.opencastproject.workflow.api.WorkflowOperationResult;
46 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
47 import org.opencastproject.workflow.api.WorkflowOperationResultImpl;
48 import org.opencastproject.workspace.api.Workspace;
49
50 import org.apache.commons.lang3.StringUtils;
51 import org.osgi.service.component.annotations.Component;
52 import org.osgi.service.component.annotations.Reference;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import java.io.File;
57 import java.io.FileInputStream;
58 import java.io.IOException;
59 import java.io.InputStreamReader;
60 import java.net.URI;
61 import java.nio.charset.StandardCharsets;
62 import java.util.HashMap;
63 import java.util.HashSet;
64 import java.util.List;
65 import java.util.Map;
66 import java.util.Map.Entry;
67 import java.util.Properties;
68 import java.util.Set;
69
70
71
72
73 @Component(
74 immediate = true,
75 service = WorkflowOperationHandler.class,
76 property = {
77 "service.description=Execute Many Workflow Operation Handler",
78 "workflow.operation=execute-many"
79 }
80 )
81 public class ExecuteManyWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
82
83
84 private static final Logger logger = LoggerFactory.getLogger(ExecuteManyWorkflowOperationHandler.class);
85
86
87 public static final String EXEC_PROPERTY = "exec";
88
89
90 public static final String PARAMS_PROPERTY = "params";
91
92
93 public static final String LOAD_PROPERTY = "load";
94
95
96 public static final String SOURCE_FLAVOR_PROPERTY = "source-flavor";
97
98
99 public static final String OUTPUT_FILENAME_PROPERTY = "output-filename";
100
101
102 public static final String EXPECTED_TYPE_PROPERTY = "expected-type";
103
104
105
106
107
108 public static final String SOURCE_TAGS_PROPERTY = "source-tags";
109
110
111 public static final String SOURCE_AUDIO_PROPERTY = "source-audio";
112
113
114 public static final String SOURCE_VIDEO_PROPERTY = "source-video";
115
116
117 public static final String SOURCE_SUBTITLE_PROPERTY = "source-subtitle";
118
119
120 public static final String TARGET_FLAVOR_PROPERTY = "target-flavor";
121
122
123 public static final String TARGET_TAGS_PROPERTY = "target-tags";
124
125
126 public static final String SET_WF_PROPS_PROPERTY = "set-workflow-properties";
127
128
129 protected ExecuteService executeService;
130
131
132 private MediaInspectionService inspectionService = null;
133
134
135 protected Workspace workspace;
136
137
138
139
140
141
142
143 @Override
144 public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
145 throws WorkflowOperationException {
146
147 MediaPackage mediaPackage = workflowInstance.getMediaPackage();
148 WorkflowOperationInstance operation = workflowInstance.getCurrentOperation();
149
150 logger.debug("Running execute workflow operation with ID {}", operation.getId());
151
152
153 String exec = StringUtils.trimToNull(operation.getConfiguration(EXEC_PROPERTY));
154 String params = StringUtils.trimToNull(operation.getConfiguration(PARAMS_PROPERTY));
155 float load = 1.0f;
156 String loadPropertyStr = StringUtils.trimToEmpty(operation.getConfiguration(LOAD_PROPERTY));
157 if (StringUtils.isNotBlank(loadPropertyStr)) {
158 try {
159 load = Float.parseFloat(loadPropertyStr);
160 } catch (NumberFormatException e) {
161 String description = StringUtils.trimToEmpty(operation.getDescription());
162 logger.warn("Ignoring invalid load value '{}' on execute operation with description '{}'", loadPropertyStr,
163 description);
164 }
165 }
166 ConfiguredTagsAndFlavors tagsAndFlavors = getTagsAndFlavors(workflowInstance,
167 Configuration.many, Configuration.many, Configuration.many, Configuration.many);
168 List<MediaPackageElementFlavor> sourceFlavor = tagsAndFlavors.getSrcFlavors();
169 List<String> sourceTagList = tagsAndFlavors.getSrcTags();
170 String sourceAudio = StringUtils.trimToNull(operation.getConfiguration(SOURCE_AUDIO_PROPERTY));
171 String sourceVideo = StringUtils.trimToNull(operation.getConfiguration(SOURCE_VIDEO_PROPERTY));
172 String sourceSubtitle = StringUtils.trimToNull(operation.getConfiguration(SOURCE_SUBTITLE_PROPERTY));
173 List<MediaPackageElementFlavor> targetFlavorList = tagsAndFlavors.getTargetFlavors();
174 ConfiguredTagsAndFlavors.TargetTags targetTags = tagsAndFlavors.getTargetTags();
175 String outputFilename = StringUtils.trimToNull(operation.getConfiguration(OUTPUT_FILENAME_PROPERTY));
176 String expectedTypeStr = StringUtils.trimToNull(operation.getConfiguration(EXPECTED_TYPE_PROPERTY));
177
178 boolean setWfProps = Boolean.valueOf(StringUtils.trimToNull(operation.getConfiguration(SET_WF_PROPS_PROPERTY)));
179
180
181 MediaPackageElementFlavor targetFlavor = null;
182 if (!targetFlavorList.isEmpty()) {
183 targetFlavor = targetFlavorList.get(0);
184 }
185
186
187 MediaPackageElement.Type expectedType = null;
188 if (expectedTypeStr != null) {
189 for (MediaPackageElement.Type type : MediaPackageElement.Type.values()) {
190 if (type.toString().equalsIgnoreCase(expectedTypeStr)) {
191 expectedType = type;
192 break;
193 }
194 }
195 if (expectedType == null) {
196 throw new WorkflowOperationException("'" + expectedTypeStr + "' is not a valid element type");
197 }
198 }
199
200
201 Set<MediaPackageElement> inputSet = new HashSet<>();
202
203 SimpleElementSelector elementSelector = new SimpleElementSelector();
204 for (MediaPackageElementFlavor flavor : sourceFlavor) {
205 elementSelector.addFlavor(flavor);
206 }
207 for (String tag : sourceTagList) {
208 elementSelector.addTag(tag);
209 }
210
211 for (MediaPackageElement element : elementSelector.select(mediaPackage, true)) {
212
213 if ((element instanceof Track) && (sourceAudio != null)
214 && (Boolean.parseBoolean(sourceAudio) != ((Track) element).hasAudio())) {
215 continue;
216 }
217
218 if ((element instanceof Track) && (sourceVideo != null)
219 && (Boolean.parseBoolean(sourceVideo) != ((Track) element).hasVideo())) {
220 continue;
221 }
222
223 if ((element instanceof Track) && (sourceSubtitle != null)
224 && (Boolean.parseBoolean(sourceSubtitle) != ((Track) element).hasSubtitle())) {
225 continue;
226 }
227
228 inputSet.add(element);
229 }
230
231 if (inputSet.size() == 0) {
232 logger.warn("Mediapackage {} has no suitable elements to execute the command {} based on tags {}, flavor {}, "
233 + "sourceAudio {}, sourceVideo {}, sourceSubtitle {}",
234 mediaPackage, exec, sourceTagList, sourceFlavor, sourceAudio, sourceVideo, sourceSubtitle);
235 return createResult(mediaPackage, Action.CONTINUE);
236 }
237
238 MediaPackageElement[] inputElements = inputSet.toArray(new MediaPackageElement[inputSet.size()]);
239
240 Map<String, String> wfProps = new HashMap<>();
241
242 try {
243 Job[] jobs = new Job[inputElements.length];
244 MediaPackageElement[] resultElements = new MediaPackageElement[inputElements.length];
245 long totalTimeInQueue = 0;
246
247 for (int i = 0; i < inputElements.length; i++) {
248 jobs[i] = executeService.execute(exec, params, inputElements[i], outputFilename, expectedType, load);
249 }
250
251
252 if (!waitForStatus(jobs).isSuccess()) {
253 throw new WorkflowOperationException("Execute operation failed");
254 }
255
256
257 HashMap<Integer, Job> jobMap = new HashMap<>();
258 for (int i = 0; i < jobs.length; i++) {
259
260 totalTimeInQueue += jobs[i].getQueueTime();
261 if (StringUtils.trimToNull(jobs[i].getPayload()) != null) {
262 resultElements[i] = MediaPackageElementParser.getFromXml(jobs[i].getPayload());
263 if (resultElements[i].getElementType() == MediaPackageElement.Type.Track) {
264 jobMap.put(i, inspectionService.inspect(resultElements[i].getURI()));
265 }
266 } else {
267 resultElements[i] = inputElements[i];
268 }
269 }
270
271 if (jobMap.size() > 0) {
272 if (!waitForStatus(jobMap.values().toArray(new Job[jobMap.size()])).isSuccess()) {
273 throw new WorkflowOperationException("Execute operation failed in track inspection");
274 }
275
276 for (Entry<Integer, Job> entry : jobMap.entrySet()) {
277
278 totalTimeInQueue += entry.getValue().getQueueTime();
279 resultElements[entry.getKey()] = MediaPackageElementParser.getFromXml(entry.getValue().getPayload());
280 }
281 }
282
283 for (int i = 0; i < resultElements.length; i++) {
284 if (resultElements[i] != inputElements[i]) {
285
286 if (setWfProps) {
287
288 final Properties properties = new Properties();
289 File propertiesFile = workspace.get(resultElements[i].getURI());
290 try (InputStreamReader reader = new InputStreamReader(
291 new FileInputStream(propertiesFile),
292 StandardCharsets.UTF_8
293 )) {
294 properties.load(reader);
295 }
296 logger.debug("Loaded {} properties from {}", properties.size(), propertiesFile);
297 workspace.deleteFromCollection(ExecuteService.COLLECTION, propertiesFile.getName());
298
299
300 wfProps.putAll((Map) properties);
301
302 } else {
303
304
305 mediaPackage.addDerived(resultElements[i], inputElements[i]);
306
307 URI uri = workspace.moveTo(resultElements[i].getURI(), mediaPackage.getIdentifier().toString(),
308 resultElements[i].getIdentifier(), outputFilename);
309
310 resultElements[i].setURI(uri);
311
312
313 if (targetFlavor != null) {
314 String targetFlavorType = targetFlavor.getType();
315 String targetFlavorSubtype = targetFlavor.getSubtype();
316
317 if (MediaPackageElementFlavor.WILDCARD.equals(targetFlavorType)) {
318 targetFlavorType = inputElements[i].getFlavor().getType();
319 }
320
321 if (MediaPackageElementFlavor.WILDCARD.equals(targetFlavorSubtype)) {
322 targetFlavorSubtype = inputElements[i].getFlavor().getSubtype();
323 }
324
325 String resolvedTargetFlavorStr =
326 targetFlavorType + MediaPackageElementFlavor.SEPARATOR + targetFlavorSubtype;
327 resultElements[i].setFlavor(MediaPackageElementFlavor.parseFlavor(resolvedTargetFlavorStr));
328 }
329 }
330 }
331
332
333 applyTargetTagsToElement(targetTags, resultElements[i]);
334 }
335
336 WorkflowOperationResult result = createResult(mediaPackage, wfProps, Action.CONTINUE, totalTimeInQueue);
337 logger.debug("Execute operation {} completed", operation.getId());
338
339 return result;
340
341 } catch (ExecuteException e) {
342 throw new WorkflowOperationException(e);
343 } catch (MediaPackageException e) {
344 throw new WorkflowOperationException("Some result element couldn't be serialized", e);
345 } catch (NotFoundException e) {
346 throw new WorkflowOperationException("Could not find mediapackage", e);
347 } catch (IOException e) {
348 throw new WorkflowOperationException("Error unmarshalling a result mediapackage element", e);
349 } catch (MediaInspectionException e) {
350 throw new WorkflowOperationException("Error inspecting one of the created tracks", e);
351 }
352
353 }
354
355
356
357
358
359
360
361 @Override
362 public WorkflowOperationResult skip(WorkflowInstance workflowInstance, JobContext context)
363 throws WorkflowOperationException {
364 return new WorkflowOperationResultImpl(workflowInstance.getMediaPackage(), null, Action.SKIP, 0);
365 }
366
367 @Override
368 public String getId() {
369 return "execute";
370 }
371
372 @Override
373 public String getDescription() {
374 return "Executes command line workflow operations in workers";
375 }
376
377 @Override
378 public void destroy(WorkflowInstance workflowInstance, JobContext context) throws WorkflowOperationException {
379
380 }
381
382
383
384
385
386
387 @Reference
388 public void setExecuteService(ExecuteService service) {
389 executeService = service;
390 }
391
392
393
394
395
396
397 @Reference
398 public void setWorkspace(Workspace workspace) {
399 this.workspace = workspace;
400 }
401
402
403
404
405
406
407
408 @Reference
409 protected void setMediaInspectionService(MediaInspectionService mediaInspectionService) {
410 inspectionService = mediaInspectionService;
411 }
412
413 @Reference
414 @Override
415 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
416 super.setServiceRegistry(serviceRegistry);
417 }
418
419 }