1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.impl;
23
24 import static java.lang.String.format;
25
26 import org.opencastproject.job.api.Incident.Severity;
27 import org.opencastproject.security.api.UnauthorizedException;
28 import org.opencastproject.util.JobCanceledException;
29 import org.opencastproject.workflow.api.ResumableWorkflowOperationHandler;
30 import org.opencastproject.workflow.api.WorkflowException;
31 import org.opencastproject.workflow.api.WorkflowInstance;
32 import org.opencastproject.workflow.api.WorkflowOperationAbortedException;
33 import org.opencastproject.workflow.api.WorkflowOperationException;
34 import org.opencastproject.workflow.api.WorkflowOperationHandler;
35 import org.opencastproject.workflow.api.WorkflowOperationInstance;
36 import org.opencastproject.workflow.api.WorkflowOperationInstance.OperationState;
37 import org.opencastproject.workflow.api.WorkflowOperationResult;
38 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
39 import org.opencastproject.workflow.conditionparser.WorkflowConditionInterpreter;
40
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import java.util.Map;
45 import java.util.function.Function;
46
47
48
49
50 final class WorkflowOperationWorker {
51 private static final Logger logger = LoggerFactory.getLogger(WorkflowOperationWorker.class);
52
53 private WorkflowOperationHandler handler;
54 private WorkflowInstance workflow;
55 private final WorkflowServiceImpl service;
56 private Map<String, String> properties = null;
57
58
59
60
61
62
63
64
65
66
67
68
69
70 private WorkflowOperationWorker(WorkflowOperationHandler handler, WorkflowInstance workflow,
71 WorkflowServiceImpl service) {
72 this.handler = handler;
73 this.workflow = workflow;
74 this.service = service;
75 }
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 WorkflowOperationWorker(WorkflowOperationHandler handler, WorkflowInstance workflow, Map<String, String> properties,
92 WorkflowServiceImpl service) {
93 this(handler, workflow, service);
94 this.properties = properties;
95 }
96
97
98
99
100
101
102
103 public void setHandler(WorkflowOperationHandler operationHandler) {
104 handler = operationHandler;
105 }
106
107
108
109
110 public WorkflowInstance execute() {
111 WorkflowOperationInstance operation = workflow.getCurrentOperation();
112 try {
113 WorkflowOperationResult result;
114 switch (operation.getState()) {
115 case INSTANTIATED:
116 case RETRY:
117 result = start();
118 break;
119 case PAUSED:
120 result = resume();
121 break;
122 default:
123 throw new IllegalStateException(
124 "Workflow operation '" + operation + "' is in unexpected state '" + operation.getState() + "'");
125 }
126 if (result == null || Action.CONTINUE.equals(result.getAction()) || Action.SKIP.equals(result.getAction())) {
127 if (handler != null) {
128 handler.destroy(workflow, null);
129 }
130 }
131 workflow = service.handleOperationResult(workflow, result);
132 return workflow;
133 } catch (JobCanceledException e) {
134 logger.info("Workflow {} operation {} job cancelled: {}", workflow.getId(), operation, e.getMessage());
135 return workflow;
136 } catch (WorkflowOperationAbortedException e) {
137
138 logger.info("Workflow operation '" + operation + "' aborted by user");
139 } catch (Exception e) {
140 logger.error("Workflow operation '" + operation + "' failed", e);
141
142 service.getServiceRegistry().incident().unhandledException(operation.getId(), Severity.FAILURE, e);
143 }
144 try {
145 workflow = service.handleOperationException(workflow, operation);
146 } catch (Exception e) {
147 logger.error("Error handling workflow operation '{}'", operation, e);
148 }
149 return workflow;
150 }
151
152
153
154
155
156
157
158
159
160
161 public WorkflowOperationResult start() throws WorkflowOperationException, WorkflowException, UnauthorizedException {
162 final WorkflowOperationInstance operation = workflow.getCurrentOperation();
163
164
165 final var organization = service.securityService.getOrganization();
166 final Function<String, String> variables = key -> {
167 if (properties != null && properties.containsKey(key)) {
168 return properties.get(key);
169 }
170 if (workflow.getConfigurations().containsKey(key)) {
171 return workflow.getConfiguration(key);
172 }
173 if (key.startsWith("org_")) {
174 return organization.getProperties().get(key.substring(4));
175 }
176 return null;
177 };
178 final String executionCondition = WorkflowConditionInterpreter.replaceVariables(
179 operation.getExecutionCondition(), variables, null, false);
180 operation.setExecutionCondition(executionCondition);
181 operation.setDescription(WorkflowConditionInterpreter.replaceVariables(
182 operation.getDescription(), variables, null, false));
183 for (var cfg: operation.getConfigurations().entrySet()) {
184 var value = WorkflowConditionInterpreter.replaceVariables(
185 cfg.getValue(), variables, null, false);
186 operation.setConfiguration(cfg.getKey(), value);
187 }
188
189
190 boolean execute = true;
191 if (executionCondition != null) {
192 try {
193 execute = WorkflowConditionInterpreter.interpret(executionCondition);
194 } catch (IllegalArgumentException e) {
195 operation.setState(OperationState.FAILED);
196 throw new WorkflowOperationException(
197 format("Unable to parse execution condition '%s'", executionCondition), e);
198 }
199 }
200
201 operation.setState(OperationState.RUNNING);
202 service.update(workflow);
203
204 try {
205 WorkflowOperationResult result = null;
206 if (execute) {
207 if (handler == null) {
208
209 logger.warn("No handler available to execute operation '{}'", operation.getTemplate());
210 throw new IllegalStateException("Unable to find a workflow handler for '" + operation.getTemplate() + "'");
211 }
212 result = handler.start(workflow, null);
213 } else {
214
215 if (handler != null) {
216 result = handler.skip(workflow, null);
217 result.setAction(Action.SKIP);
218 }
219 }
220 return result;
221 } catch (Exception e) {
222 operation.setState(OperationState.FAILED);
223 if (e instanceof WorkflowOperationException) {
224 throw (WorkflowOperationException) e;
225 }
226 throw new WorkflowOperationException(e);
227 }
228 }
229
230
231
232
233
234
235
236
237
238
239
240
241
242 public WorkflowOperationResult resume()
243 throws WorkflowOperationException, WorkflowException, IllegalStateException, UnauthorizedException {
244 WorkflowOperationInstance operation = workflow.getCurrentOperation();
245
246
247 if (handler == null) {
248
249 logger.warn("No handler available to resume operation '{}'", operation.getTemplate());
250 throw new IllegalStateException("Unable to find a workflow handler for '" + operation.getTemplate() + "'");
251 } else if (!(handler instanceof ResumableWorkflowOperationHandler)) {
252 throw new IllegalStateException("An attempt was made to resume a non-resumable operation");
253 }
254
255 ResumableWorkflowOperationHandler resumableHandler = (ResumableWorkflowOperationHandler) handler;
256 operation.setState(OperationState.RUNNING);
257 service.update(workflow);
258
259 try {
260 return resumableHandler.resume(workflow, null, properties);
261 } catch (Exception e) {
262 operation.setState(OperationState.FAILED);
263 if (e instanceof WorkflowOperationException) {
264 throw (WorkflowOperationException) e;
265 }
266 throw new WorkflowOperationException(e);
267 }
268 }
269 }