View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.impl;
23  
24  import static java.lang.String.format;
25  
26  import org.opencastproject.job.api.Incident.Severity;
27  import org.opencastproject.security.api.UnauthorizedException;
28  import org.opencastproject.util.JobCanceledException;
29  import org.opencastproject.workflow.api.ResumableWorkflowOperationHandler;
30  import org.opencastproject.workflow.api.WorkflowException;
31  import org.opencastproject.workflow.api.WorkflowInstance;
32  import org.opencastproject.workflow.api.WorkflowOperationAbortedException;
33  import org.opencastproject.workflow.api.WorkflowOperationException;
34  import org.opencastproject.workflow.api.WorkflowOperationHandler;
35  import org.opencastproject.workflow.api.WorkflowOperationInstance;
36  import org.opencastproject.workflow.api.WorkflowOperationInstance.OperationState;
37  import org.opencastproject.workflow.api.WorkflowOperationResult;
38  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
39  import org.opencastproject.workflow.conditionparser.WorkflowConditionInterpreter;
40  
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  import java.util.Map;
45  import java.util.function.Function;
46  
47  /**
48   * Handles execution of a workflow operation.
49   */
50  final class WorkflowOperationWorker {
51    private static final Logger logger = LoggerFactory.getLogger(WorkflowOperationWorker.class);
52  
53    private WorkflowOperationHandler handler;
54    private WorkflowInstance workflow;
55    private final WorkflowServiceImpl service;
56    private Map<String, String> properties = null;
57  
58    /**
59     * Creates a worker that will execute the given handler and thereby the current operation of the workflow instance.
60     * When the worker is finished, a callback will be made to the workflow service reporting either success or failure of
61     * the current workflow operation.
62     *
63     * @param handler
64     *          the workflow operation handler
65     * @param workflow
66     *          the workflow instance
67     * @param service
68     *          the workflow service.
69     */
70    private WorkflowOperationWorker(WorkflowOperationHandler handler, WorkflowInstance workflow,
71            WorkflowServiceImpl service) {
72      this.handler = handler;
73      this.workflow = workflow;
74      this.service = service;
75    }
76  
77    /**
78     * Creates a worker that will execute the given handler and thereby the current operation of the workflow instance.
79     * When the worker is finished, a callback will be made to the workflow service reporting either success or failure of
80     * the current workflow operation.
81     *
82     * @param handler
83     *          the workflow operation handler
84     * @param workflow
85     *          the workflow instance
86     * @param properties
87     *          the properties used to execute the operation
88     * @param service
89     *          the workflow service.
90     */
91    WorkflowOperationWorker(WorkflowOperationHandler handler, WorkflowInstance workflow, Map<String, String> properties,
92            WorkflowServiceImpl service) {
93      this(handler, workflow, service);
94      this.properties = properties;
95    }
96  
97    /**
98     * Sets the workflow operation handler to use.
99     *
100    * @param operationHandler
101    *          the handler
102    */
103   public void setHandler(WorkflowOperationHandler operationHandler) {
104     handler = operationHandler;
105   }
106 
107   /**
108    * Executes the workflow operation logic.
109    */
110   public WorkflowInstance execute() {
111     WorkflowOperationInstance operation = workflow.getCurrentOperation();
112     try {
113       WorkflowOperationResult result;
114       switch (operation.getState()) {
115         case INSTANTIATED:
116         case RETRY:
117           result = start();
118           break;
119         case PAUSED:
120           result = resume();
121           break;
122         default:
123           throw new IllegalStateException(
124                   "Workflow operation '" + operation + "' is in unexpected state '" + operation.getState() + "'");
125       }
126       if (result == null || Action.CONTINUE.equals(result.getAction()) || Action.SKIP.equals(result.getAction())) {
127         if (handler != null) {
128           handler.destroy(workflow, null);
129         }
130       }
131       workflow = service.handleOperationResult(workflow, result);
132       return workflow;
133     } catch (JobCanceledException e) {
134       logger.info("Workflow {} operation {} job cancelled: {}", workflow.getId(), operation, e.getMessage());
135       return workflow;
136     } catch (WorkflowOperationAbortedException e) {
137       // Don't log it as error because it was aborted by the user
138       logger.info("Workflow operation '" + operation + "' aborted by user");
139     } catch (Exception e) {
140       logger.error("Workflow operation '" + operation + "' failed", e);
141       // the associated job shares operation's id
142       service.getServiceRegistry().incident().unhandledException(operation.getId(), Severity.FAILURE, e);
143     }
144     try {
145       workflow = service.handleOperationException(workflow, operation);
146     } catch (Exception e) {
147       logger.error("Error handling workflow operation '{}'", operation, e);
148     }
149     return workflow;
150   }
151 
152   /**
153    * Starts executing the workflow operation.
154    *
155    * @return the workflow operation result
156    * @throws WorkflowOperationException
157    *           if executing the workflow operation handler fails
158    * @throws WorkflowException
159    *           if there is a problem processing the workflow
160    */
161   public WorkflowOperationResult start() throws WorkflowOperationException, WorkflowException, UnauthorizedException {
162     final WorkflowOperationInstance operation = workflow.getCurrentOperation();
163 
164     // Update execution condition and metadata
165     final var organization = service.securityService.getOrganization();
166     final Function<String, String> variables = key -> {
167       if (properties != null && properties.containsKey(key)) {
168         return properties.get(key);
169       }
170       if (workflow.getConfigurations().containsKey(key)) {
171         return workflow.getConfiguration(key);
172       }
173       if (key.startsWith("org_")) {
174         return organization.getProperties().get(key.substring(4));
175       }
176       return null;
177     };
178     final String executionCondition = WorkflowConditionInterpreter.replaceVariables(
179         operation.getExecutionCondition(), variables, null, false);
180     operation.setExecutionCondition(executionCondition);
181     operation.setDescription(WorkflowConditionInterpreter.replaceVariables(
182         operation.getDescription(), variables, null, false));
183     for (var cfg: operation.getConfigurations().entrySet()) {
184       var value = WorkflowConditionInterpreter.replaceVariables(
185           cfg.getValue(), variables, null, false);
186       operation.setConfiguration(cfg.getKey(), value);
187     }
188 
189     // Do we need to execute the operation?
190     boolean execute = true;
191     if (executionCondition != null) {
192       try {
193         execute = WorkflowConditionInterpreter.interpret(executionCondition);
194       } catch (IllegalArgumentException e) {
195         operation.setState(OperationState.FAILED);
196         throw new WorkflowOperationException(
197                 format("Unable to parse execution condition '%s'", executionCondition), e);
198       }
199     }
200 
201     operation.setState(OperationState.RUNNING);
202     service.update(workflow);
203 
204     try {
205       WorkflowOperationResult result = null;
206       if (execute) {
207         if (handler == null) {
208           // If there is no handler for the operation, yet we are supposed to run it, we must fail
209           logger.warn("No handler available to execute operation '{}'", operation.getTemplate());
210           throw new IllegalStateException("Unable to find a workflow handler for '" + operation.getTemplate() + "'");
211         }
212         result = handler.start(workflow, null);
213       } else {
214         // Allow for null handlers when we are skipping an operation
215         if (handler != null) {
216           result = handler.skip(workflow, null);
217           result.setAction(Action.SKIP);
218         }
219       }
220       return result;
221     } catch (Exception e) {
222       operation.setState(OperationState.FAILED);
223       if (e instanceof WorkflowOperationException) {
224         throw (WorkflowOperationException) e;
225       }
226       throw new WorkflowOperationException(e);
227     }
228   }
229 
230   /**
231    * Resumes a previously suspended workflow operation. Note that only workflow operation handlers that implement
232    * {@link ResumableWorkflowOperationHandler} can be resumed.
233    *
234    * @return the workflow operation result
235    * @throws WorkflowOperationException
236    *           if executing the workflow operation handler fails
237    * @throws WorkflowException
238    *           if there is a problem processing the workflow
239    * @throws IllegalStateException
240    *           if the workflow operation cannot be resumed
241    */
242   public WorkflowOperationResult resume()
243           throws WorkflowOperationException, WorkflowException, IllegalStateException, UnauthorizedException {
244     WorkflowOperationInstance operation = workflow.getCurrentOperation();
245 
246     // Make sure we have a (suitable) handler
247     if (handler == null) {
248       // If there is no handler for the operation, yet we are supposed to run it, we must fail
249       logger.warn("No handler available to resume operation '{}'", operation.getTemplate());
250       throw new IllegalStateException("Unable to find a workflow handler for '" + operation.getTemplate() + "'");
251     } else if (!(handler instanceof ResumableWorkflowOperationHandler)) {
252       throw new IllegalStateException("An attempt was made to resume a non-resumable operation");
253     }
254 
255     ResumableWorkflowOperationHandler resumableHandler = (ResumableWorkflowOperationHandler) handler;
256     operation.setState(OperationState.RUNNING);
257     service.update(workflow);
258 
259     try {
260       return resumableHandler.resume(workflow, null, properties);
261     } catch (Exception e) {
262       operation.setState(OperationState.FAILED);
263       if (e instanceof WorkflowOperationException) {
264         throw (WorkflowOperationException) e;
265       }
266       throw new WorkflowOperationException(e);
267     }
268   }
269 }