1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.execute.operation.handler;
23
24 import org.opencastproject.execute.api.ExecuteException;
25 import org.opencastproject.execute.api.ExecuteService;
26 import org.opencastproject.inspection.api.MediaInspectionException;
27 import org.opencastproject.inspection.api.MediaInspectionService;
28 import org.opencastproject.job.api.Job;
29 import org.opencastproject.job.api.JobContext;
30 import org.opencastproject.mediapackage.MediaPackage;
31 import org.opencastproject.mediapackage.MediaPackageElement;
32 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
33 import org.opencastproject.mediapackage.MediaPackageElementParser;
34 import org.opencastproject.mediapackage.MediaPackageException;
35 import org.opencastproject.serviceregistry.api.ServiceRegistry;
36 import org.opencastproject.util.NotFoundException;
37 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
38 import org.opencastproject.workflow.api.ConfiguredTagsAndFlavors;
39 import org.opencastproject.workflow.api.WorkflowInstance;
40 import org.opencastproject.workflow.api.WorkflowOperationException;
41 import org.opencastproject.workflow.api.WorkflowOperationHandler;
42 import org.opencastproject.workflow.api.WorkflowOperationInstance;
43 import org.opencastproject.workflow.api.WorkflowOperationResult;
44 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
45 import org.opencastproject.workflow.api.WorkflowOperationResultImpl;
46 import org.opencastproject.workspace.api.Workspace;
47
48 import org.apache.commons.lang3.StringUtils;
49 import org.osgi.service.component.annotations.Component;
50 import org.osgi.service.component.annotations.Reference;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import java.io.File;
55 import java.io.FileInputStream;
56 import java.io.IOException;
57 import java.io.InputStreamReader;
58 import java.net.URI;
59 import java.nio.charset.StandardCharsets;
60 import java.util.HashMap;
61 import java.util.List;
62 import java.util.Map;
63 import java.util.Properties;
64
65
66
67
68 @Component(
69 immediate = true,
70 service = WorkflowOperationHandler.class,
71 property = {
72 "service.description=Execute Once Workflow Operation Handler",
73 "workflow.operation=execute-once"
74 }
75 )
76 public class ExecuteOnceWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
77
78
79 private static final Logger logger = LoggerFactory.getLogger(ExecuteOnceWorkflowOperationHandler.class);
80
81
82 public static final String EXEC_PROPERTY = "exec";
83
84
85 public static final String PARAMS_PROPERTY = "params";
86
87
88 public static final String LOAD_PROPERTY = "load";
89
90
91 public static final String OUTPUT_FILENAME_PROPERTY = "output-filename";
92
93
94 public static final String EXPECTED_TYPE_PROPERTY = "expected-type";
95
96
97 public static final String TARGET_FLAVOR_PROPERTY = "target-flavor";
98
99
100 public static final String TARGET_FLAVORS_PROPERTY = "target-flavors";
101
102
103 public static final String TARGET_TAGS_PROPERTY = "target-tags";
104
105
106 public static final String TARGET_TAG_PROPERTY = "target-tag";
107
108
109 public static final String SET_WF_PROPS_PROPERTY = "set-workflow-properties";
110
111
112 protected ExecuteService executeService;
113
114
115 private MediaInspectionService inspectionService = null;
116
117
118 protected Workspace workspace;
119
120
121
122
123
124
125 @Override
126 public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context) throws WorkflowOperationException {
127
128 MediaPackage mediaPackage = workflowInstance.getMediaPackage();
129 WorkflowOperationInstance operation = workflowInstance.getCurrentOperation();
130
131 logger.debug("Running execute workflow operation with ID {}", operation.getId());
132
133
134 String exec = StringUtils.trimToNull(operation.getConfiguration(EXEC_PROPERTY));
135 String params = StringUtils.trimToNull(operation.getConfiguration(PARAMS_PROPERTY));
136 float load = 1.0f;
137 String loadPropertyStr = StringUtils.trimToEmpty(operation.getConfiguration(LOAD_PROPERTY));
138 if (StringUtils.isNotBlank(loadPropertyStr)) {
139 try {
140 load = Float.parseFloat(loadPropertyStr);
141 } catch (NumberFormatException e) {
142 String description = StringUtils.trimToEmpty(operation.getDescription());
143 logger.warn("Ignoring invalid load value '{}' on execute operation with description '{}'", loadPropertyStr, description);
144 }
145 }
146 ConfiguredTagsAndFlavors tagsAndFlavors = getTagsAndFlavors(workflowInstance,
147 Configuration.none, Configuration.none, Configuration.many, Configuration.many);
148 List<MediaPackageElementFlavor> targetFlavorStr = tagsAndFlavors.getTargetFlavors();
149 ConfiguredTagsAndFlavors.TargetTags targetTags = tagsAndFlavors.getTargetTags();
150 String outputFilename = StringUtils.trimToNull(operation.getConfiguration(OUTPUT_FILENAME_PROPERTY));
151 String expectedTypeStr = StringUtils.trimToNull(operation.getConfiguration(EXPECTED_TYPE_PROPERTY));
152
153 boolean setWfProps = Boolean.valueOf(StringUtils.trimToNull(operation.getConfiguration(SET_WF_PROPS_PROPERTY)));
154
155
156 MediaPackageElementFlavor targetFlavor = null;
157 if (!targetFlavorStr.isEmpty())
158 targetFlavor = targetFlavorStr.get(0);
159
160
161 MediaPackageElement.Type expectedType = null;
162 if (expectedTypeStr != null) {
163 for (MediaPackageElement.Type type : MediaPackageElement.Type.values())
164 if (type.toString().equalsIgnoreCase(expectedTypeStr)) {
165 expectedType = type;
166 break;
167 }
168
169 if (expectedType == null)
170 throw new WorkflowOperationException("'" + expectedTypeStr + "' is not a valid element type");
171 }
172
173
174 MediaPackageElement resultElement = null;
175
176 try {
177 Job job = executeService.execute(exec, params, mediaPackage, outputFilename, expectedType, load);
178
179 WorkflowOperationResult result = null;
180
181
182 if (!waitForStatus(job).isSuccess())
183 throw new WorkflowOperationException("Execute operation failed");
184
185 if (StringUtils.isNotBlank(job.getPayload())) {
186
187 if (setWfProps) {
188
189 resultElement = MediaPackageElementParser.getFromXml(job.getPayload());
190
191 final Properties properties = new Properties();
192 File propertiesFile = workspace.get(resultElement.getURI());
193 try (InputStreamReader reader = new InputStreamReader(new FileInputStream(propertiesFile), StandardCharsets.UTF_8)) {
194 properties.load(reader);
195 }
196 logger.debug("Loaded {} properties from {}", properties.size(), propertiesFile);
197 workspace.deleteFromCollection(ExecuteService.COLLECTION, propertiesFile.getName());
198
199 Map<String, String> wfProps = new HashMap<String, String>((Map) properties);
200
201 result = createResult(mediaPackage, wfProps, Action.CONTINUE, job.getQueueTime());
202 } else {
203
204 resultElement = MediaPackageElementParser.getFromXml(job.getPayload());
205
206 if (resultElement.getElementType() == MediaPackageElement.Type.Track) {
207
208 Job inspectionJob = null;
209 inspectionJob = inspectionService.inspect(resultElement.getURI());
210
211 if (!waitForStatus(inspectionJob).isSuccess()) {
212 throw new ExecuteException("Media inspection of " + resultElement.getURI() + " failed");
213 }
214
215 resultElement = MediaPackageElementParser.getFromXml(inspectionJob.getPayload());
216 }
217
218
219 mediaPackage.add(resultElement);
220 URI uri = workspace.moveTo(resultElement.getURI(), mediaPackage.getIdentifier().toString(),
221 resultElement.getIdentifier(), outputFilename);
222 resultElement.setURI(uri);
223
224
225 if (targetFlavor != null)
226 resultElement.setFlavor(targetFlavor);
227
228
229 applyTargetTagsToElement(targetTags, resultElement);
230
231 result = createResult(mediaPackage, Action.CONTINUE, job.getQueueTime());
232 }
233 } else {
234
235 result = createResult(mediaPackage, Action.CONTINUE, job.getQueueTime());
236 }
237
238 logger.debug("Execute operation {} completed", operation.getId());
239
240 return result;
241
242 } catch (ExecuteException e) {
243 throw new WorkflowOperationException(e);
244 } catch (MediaPackageException e) {
245 throw new WorkflowOperationException("Some result element couldn't be serialized", e);
246 } catch (NotFoundException e) {
247 throw new WorkflowOperationException("Could not find mediapackage", e);
248 } catch (IOException e) {
249 throw new WorkflowOperationException("Error unmarshalling a result mediapackage element", e);
250 } catch (MediaInspectionException e) {
251 throw new WorkflowOperationException("Media inspection of " + resultElement.getURI() + " failed", e);
252 }
253
254 }
255
256
257
258
259
260
261 @Override
262 public WorkflowOperationResult skip(WorkflowInstance workflowInstance, JobContext context) throws WorkflowOperationException {
263 return new WorkflowOperationResultImpl(workflowInstance.getMediaPackage(), null, Action.SKIP, 0);
264 }
265
266 @Override
267 public String getId() {
268 return "execute";
269 }
270
271 @Override
272 public String getDescription() {
273 return "Executes command line workflow operations in workers";
274 }
275
276 @Override
277 public void destroy(WorkflowInstance workflowInstance, JobContext context) throws WorkflowOperationException {
278
279 }
280
281
282
283
284
285
286 @Reference
287 public void setExecuteService(ExecuteService service) {
288 this.executeService = service;
289 }
290
291
292
293
294
295
296 @Reference
297 public void setWorkspace(Workspace workspace) {
298 this.workspace = workspace;
299 }
300
301
302
303
304
305
306
307 @Reference
308 protected void setMediaInspectionService(MediaInspectionService mediaInspectionService) {
309 this.inspectionService = mediaInspectionService;
310 }
311
312 @Reference
313 @Override
314 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
315 super.setServiceRegistry(serviceRegistry);
316 }
317 }