1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.execute.operation.handler;
23
24 import org.opencastproject.execute.api.ExecuteException;
25 import org.opencastproject.execute.api.ExecuteService;
26 import org.opencastproject.inspection.api.MediaInspectionException;
27 import org.opencastproject.inspection.api.MediaInspectionService;
28 import org.opencastproject.job.api.Job;
29 import org.opencastproject.job.api.JobContext;
30 import org.opencastproject.mediapackage.MediaPackage;
31 import org.opencastproject.mediapackage.MediaPackageElement;
32 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
33 import org.opencastproject.mediapackage.MediaPackageElementParser;
34 import org.opencastproject.mediapackage.MediaPackageException;
35 import org.opencastproject.mediapackage.Track;
36 import org.opencastproject.mediapackage.selector.SimpleElementSelector;
37 import org.opencastproject.serviceregistry.api.ServiceRegistry;
38 import org.opencastproject.util.NotFoundException;
39 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
40 import org.opencastproject.workflow.api.ConfiguredTagsAndFlavors;
41 import org.opencastproject.workflow.api.WorkflowInstance;
42 import org.opencastproject.workflow.api.WorkflowOperationException;
43 import org.opencastproject.workflow.api.WorkflowOperationHandler;
44 import org.opencastproject.workflow.api.WorkflowOperationInstance;
45 import org.opencastproject.workflow.api.WorkflowOperationResult;
46 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
47 import org.opencastproject.workflow.api.WorkflowOperationResultImpl;
48 import org.opencastproject.workspace.api.Workspace;
49
50 import org.apache.commons.lang3.StringUtils;
51 import org.osgi.service.component.annotations.Component;
52 import org.osgi.service.component.annotations.Reference;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import java.io.File;
57 import java.io.FileInputStream;
58 import java.io.IOException;
59 import java.io.InputStreamReader;
60 import java.net.URI;
61 import java.nio.charset.StandardCharsets;
62 import java.util.HashMap;
63 import java.util.HashSet;
64 import java.util.List;
65 import java.util.Map;
66 import java.util.Map.Entry;
67 import java.util.Properties;
68 import java.util.Set;
69
70
71
72
73 @Component(
74 immediate = true,
75 service = WorkflowOperationHandler.class,
76 property = {
77 "service.description=Execute Many Workflow Operation Handler",
78 "workflow.operation=execute-many"
79 }
80 )
81 public class ExecuteManyWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
82
83
84 private static final Logger logger = LoggerFactory.getLogger(ExecuteManyWorkflowOperationHandler.class);
85
86
87 public static final String EXEC_PROPERTY = "exec";
88
89
90 public static final String PARAMS_PROPERTY = "params";
91
92
93 public static final String LOAD_PROPERTY = "load";
94
95
96 public static final String SOURCE_FLAVOR_PROPERTY = "source-flavor";
97
98
99 public static final String OUTPUT_FILENAME_PROPERTY = "output-filename";
100
101
102 public static final String EXPECTED_TYPE_PROPERTY = "expected-type";
103
104
105
106
107
108 public static final String SOURCE_TAGS_PROPERTY = "source-tags";
109
110
111 public static final String SOURCE_AUDIO_PROPERTY = "source-audio";
112
113
114 public static final String SOURCE_VIDEO_PROPERTY = "source-video";
115
116
117 public static final String SOURCE_SUBTITLE_PROPERTY = "source-subtitle";
118
119
120 public static final String TARGET_FLAVOR_PROPERTY = "target-flavor";
121
122
123 public static final String TARGET_TAGS_PROPERTY = "target-tags";
124
125
126 public static final String SET_WF_PROPS_PROPERTY = "set-workflow-properties";
127
128
129 protected ExecuteService executeService;
130
131
132 private MediaInspectionService inspectionService = null;
133
134
135 protected Workspace workspace;
136
137
138
139
140
141
142
143 @Override
144 public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
145 throws WorkflowOperationException {
146
147 MediaPackage mediaPackage = workflowInstance.getMediaPackage();
148 WorkflowOperationInstance operation = workflowInstance.getCurrentOperation();
149
150 logger.debug("Running execute workflow operation with ID {}", operation.getId());
151
152
153 String exec = StringUtils.trimToNull(operation.getConfiguration(EXEC_PROPERTY));
154 String params = StringUtils.trimToNull(operation.getConfiguration(PARAMS_PROPERTY));
155 float load = 1.0f;
156 String loadPropertyStr = StringUtils.trimToEmpty(operation.getConfiguration(LOAD_PROPERTY));
157 if (StringUtils.isNotBlank(loadPropertyStr)) {
158 try {
159 load = Float.parseFloat(loadPropertyStr);
160 } catch (NumberFormatException e) {
161 String description = StringUtils.trimToEmpty(operation.getDescription());
162 logger.warn("Ignoring invalid load value '{}' on execute operation with description '{}'", loadPropertyStr,
163 description);
164 }
165 }
166 ConfiguredTagsAndFlavors tagsAndFlavors = getTagsAndFlavors(workflowInstance,
167 Configuration.many, Configuration.many, Configuration.many, Configuration.many);
168 List<MediaPackageElementFlavor> sourceFlavor = tagsAndFlavors.getSrcFlavors();
169 List<String> sourceTagList = tagsAndFlavors.getSrcTags();
170 String sourceAudio = StringUtils.trimToNull(operation.getConfiguration(SOURCE_AUDIO_PROPERTY));
171 String sourceVideo = StringUtils.trimToNull(operation.getConfiguration(SOURCE_VIDEO_PROPERTY));
172 String sourceSubtitle = StringUtils.trimToNull(operation.getConfiguration(SOURCE_SUBTITLE_PROPERTY));
173 List<MediaPackageElementFlavor> targetFlavorList = tagsAndFlavors.getTargetFlavors();
174 ConfiguredTagsAndFlavors.TargetTags targetTags = tagsAndFlavors.getTargetTags();
175 String outputFilename = StringUtils.trimToNull(operation.getConfiguration(OUTPUT_FILENAME_PROPERTY));
176 String expectedTypeStr = StringUtils.trimToNull(operation.getConfiguration(EXPECTED_TYPE_PROPERTY));
177
178 boolean setWfProps = Boolean.valueOf(StringUtils.trimToNull(operation.getConfiguration(SET_WF_PROPS_PROPERTY)));
179
180
181 MediaPackageElementFlavor targetFlavor = null;
182 if (!targetFlavorList.isEmpty())
183 targetFlavor = targetFlavorList.get(0);
184
185
186 MediaPackageElement.Type expectedType = null;
187 if (expectedTypeStr != null) {
188 for (MediaPackageElement.Type type : MediaPackageElement.Type.values())
189 if (type.toString().equalsIgnoreCase(expectedTypeStr)) {
190 expectedType = type;
191 break;
192 }
193
194 if (expectedType == null)
195 throw new WorkflowOperationException("'" + expectedTypeStr + "' is not a valid element type");
196 }
197
198
199 Set<MediaPackageElement> inputSet = new HashSet<>();
200
201 SimpleElementSelector elementSelector = new SimpleElementSelector();
202 for (MediaPackageElementFlavor flavor : sourceFlavor) {
203 elementSelector.addFlavor(flavor);
204 }
205 for (String tag : sourceTagList) {
206 elementSelector.addTag(tag);
207 }
208
209 for (MediaPackageElement element : elementSelector.select(mediaPackage, true)) {
210
211 if ((element instanceof Track) && (sourceAudio != null)
212 && (Boolean.parseBoolean(sourceAudio) != ((Track) element).hasAudio())) {
213 continue;
214 }
215
216 if ((element instanceof Track) && (sourceVideo != null)
217 && (Boolean.parseBoolean(sourceVideo) != ((Track) element).hasVideo())) {
218 continue;
219 }
220
221 if ((element instanceof Track) && (sourceSubtitle != null)
222 && (Boolean.parseBoolean(sourceSubtitle) != ((Track) element).hasSubtitle())) {
223 continue;
224 }
225
226 inputSet.add(element);
227 }
228
229 if (inputSet.size() == 0) {
230 logger.warn("Mediapackage {} has no suitable elements to execute the command {} based on tags {}, flavor {}, sourceAudio {}, sourceVideo {}, sourceSubtitle {}",
231 mediaPackage, exec, sourceTagList, sourceFlavor, sourceAudio, sourceVideo, sourceSubtitle);
232 return createResult(mediaPackage, Action.CONTINUE);
233 }
234
235 MediaPackageElement[] inputElements = inputSet.toArray(new MediaPackageElement[inputSet.size()]);
236
237 Map<String, String> wfProps = new HashMap<>();
238
239 try {
240 Job[] jobs = new Job[inputElements.length];
241 MediaPackageElement[] resultElements = new MediaPackageElement[inputElements.length];
242 long totalTimeInQueue = 0;
243
244 for (int i = 0; i < inputElements.length; i++)
245 jobs[i] = executeService.execute(exec, params, inputElements[i], outputFilename, expectedType, load);
246
247
248 if (!waitForStatus(jobs).isSuccess())
249 throw new WorkflowOperationException("Execute operation failed");
250
251
252 HashMap<Integer, Job> jobMap = new HashMap<>();
253 for (int i = 0; i < jobs.length; i++) {
254
255 totalTimeInQueue += jobs[i].getQueueTime();
256 if (StringUtils.trimToNull(jobs[i].getPayload()) != null) {
257 resultElements[i] = MediaPackageElementParser.getFromXml(jobs[i].getPayload());
258 if (resultElements[i].getElementType() == MediaPackageElement.Type.Track) {
259 jobMap.put(i, inspectionService.inspect(resultElements[i].getURI()));
260 }
261 } else
262 resultElements[i] = inputElements[i];
263 }
264
265 if (jobMap.size() > 0) {
266 if (!waitForStatus(jobMap.values().toArray(new Job[jobMap.size()])).isSuccess())
267 throw new WorkflowOperationException("Execute operation failed in track inspection");
268
269 for (Entry<Integer, Job> entry : jobMap.entrySet()) {
270
271 totalTimeInQueue += entry.getValue().getQueueTime();
272 resultElements[entry.getKey()] = MediaPackageElementParser.getFromXml(entry.getValue().getPayload());
273 }
274 }
275
276 for (int i = 0; i < resultElements.length; i++) {
277 if (resultElements[i] != inputElements[i]) {
278
279 if (setWfProps) {
280
281 final Properties properties = new Properties();
282 File propertiesFile = workspace.get(resultElements[i].getURI());
283 try (InputStreamReader reader = new InputStreamReader(new FileInputStream(propertiesFile), StandardCharsets.UTF_8)) {
284 properties.load(reader);
285 }
286 logger.debug("Loaded {} properties from {}", properties.size(), propertiesFile);
287 workspace.deleteFromCollection(ExecuteService.COLLECTION, propertiesFile.getName());
288
289
290 wfProps.putAll((Map) properties);
291
292 } else {
293
294
295 mediaPackage.addDerived(resultElements[i], inputElements[i]);
296
297 URI uri = workspace.moveTo(resultElements[i].getURI(), mediaPackage.getIdentifier().toString(),
298 resultElements[i].getIdentifier(), outputFilename);
299
300 resultElements[i].setURI(uri);
301
302
303 if (targetFlavor != null) {
304 String targetFlavorType = targetFlavor.getType();
305 String targetFlavorSubtype = targetFlavor.getSubtype();
306
307 if (MediaPackageElementFlavor.WILDCARD.equals(targetFlavorType)) {
308 targetFlavorType = inputElements[i].getFlavor().getType();
309 }
310
311 if (MediaPackageElementFlavor.WILDCARD.equals(targetFlavorSubtype)) {
312 targetFlavorSubtype = inputElements[i].getFlavor().getSubtype();
313 }
314
315 String resolvedTargetFlavorStr =
316 targetFlavorType + MediaPackageElementFlavor.SEPARATOR + targetFlavorSubtype;
317 resultElements[i].setFlavor(MediaPackageElementFlavor.parseFlavor(resolvedTargetFlavorStr));
318 }
319 }
320 }
321
322
323 applyTargetTagsToElement(targetTags, resultElements[i]);
324 }
325
326 WorkflowOperationResult result = createResult(mediaPackage, wfProps, Action.CONTINUE, totalTimeInQueue);
327 logger.debug("Execute operation {} completed", operation.getId());
328
329 return result;
330
331 } catch (ExecuteException e) {
332 throw new WorkflowOperationException(e);
333 } catch (MediaPackageException e) {
334 throw new WorkflowOperationException("Some result element couldn't be serialized", e);
335 } catch (NotFoundException e) {
336 throw new WorkflowOperationException("Could not find mediapackage", e);
337 } catch (IOException e) {
338 throw new WorkflowOperationException("Error unmarshalling a result mediapackage element", e);
339 } catch (MediaInspectionException e) {
340 throw new WorkflowOperationException("Error inspecting one of the created tracks", e);
341 }
342
343 }
344
345
346
347
348
349
350
351 @Override
352 public WorkflowOperationResult skip(WorkflowInstance workflowInstance, JobContext context)
353 throws WorkflowOperationException {
354 return new WorkflowOperationResultImpl(workflowInstance.getMediaPackage(), null, Action.SKIP, 0);
355 }
356
357 @Override
358 public String getId() {
359 return "execute";
360 }
361
362 @Override
363 public String getDescription() {
364 return "Executes command line workflow operations in workers";
365 }
366
367 @Override
368 public void destroy(WorkflowInstance workflowInstance, JobContext context) throws WorkflowOperationException {
369
370 }
371
372
373
374
375
376
377 @Reference
378 public void setExecuteService(ExecuteService service) {
379 executeService = service;
380 }
381
382
383
384
385
386
387 @Reference
388 public void setWorkspace(Workspace workspace) {
389 this.workspace = workspace;
390 }
391
392
393
394
395
396
397
398 @Reference
399 protected void setMediaInspectionService(MediaInspectionService mediaInspectionService) {
400 inspectionService = mediaInspectionService;
401 }
402
403 @Reference
404 @Override
405 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
406 super.setServiceRegistry(serviceRegistry);
407 }
408
409 }