View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.impl;
23  
24  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
25  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILED;
26  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILING;
27  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.INSTANTIATED;
28  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.PAUSED;
29  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.RUNNING;
30  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.STOPPED;
31  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.SUCCEEDED;
32  
33  import org.opencastproject.assetmanager.api.AssetManager;
34  import org.opencastproject.assetmanager.api.Snapshot;
35  import org.opencastproject.assetmanager.util.WorkflowPropertiesUtil;
36  import org.opencastproject.elasticsearch.api.SearchIndexException;
37  import org.opencastproject.elasticsearch.api.SearchResult;
38  import org.opencastproject.elasticsearch.api.SearchResultItem;
39  import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
40  import org.opencastproject.elasticsearch.index.objects.event.Event;
41  import org.opencastproject.elasticsearch.index.objects.event.EventSearchQuery;
42  import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
43  import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
44  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
45  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
46  import org.opencastproject.job.api.Job;
47  import org.opencastproject.job.api.Job.Status;
48  import org.opencastproject.job.api.JobProducer;
49  import org.opencastproject.mediapackage.MediaPackage;
50  import org.opencastproject.mediapackage.MediaPackageElement;
51  import org.opencastproject.mediapackage.MediaPackageParser;
52  import org.opencastproject.mediapackage.MediaPackageSupport;
53  import org.opencastproject.mediapackage.Publication;
54  import org.opencastproject.metadata.api.MediaPackageMetadata;
55  import org.opencastproject.metadata.api.MediaPackageMetadataService;
56  import org.opencastproject.metadata.api.MetadataService;
57  import org.opencastproject.metadata.api.util.MediaPackageMetadataSupport;
58  import org.opencastproject.security.api.AccessControlList;
59  import org.opencastproject.security.api.AccessControlUtil;
60  import org.opencastproject.security.api.AclScope;
61  import org.opencastproject.security.api.AuthorizationService;
62  import org.opencastproject.security.api.Organization;
63  import org.opencastproject.security.api.OrganizationDirectoryService;
64  import org.opencastproject.security.api.Permissions;
65  import org.opencastproject.security.api.SecurityService;
66  import org.opencastproject.security.api.UnauthorizedException;
67  import org.opencastproject.security.api.User;
68  import org.opencastproject.security.api.UserDirectoryService;
69  import org.opencastproject.series.api.SeriesException;
70  import org.opencastproject.series.api.SeriesService;
71  import org.opencastproject.serviceregistry.api.ServiceRegistry;
72  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
73  import org.opencastproject.serviceregistry.api.UndispatchableJobException;
74  import org.opencastproject.util.NotFoundException;
75  import org.opencastproject.util.ReadinessIndicator;
76  import org.opencastproject.util.data.Tuple;
77  import org.opencastproject.workflow.api.ResumableWorkflowOperationHandler;
78  import org.opencastproject.workflow.api.RetryStrategy;
79  import org.opencastproject.workflow.api.WorkflowDatabaseException;
80  import org.opencastproject.workflow.api.WorkflowDefinition;
81  import org.opencastproject.workflow.api.WorkflowException;
82  import org.opencastproject.workflow.api.WorkflowIdentifier;
83  import org.opencastproject.workflow.api.WorkflowIndexData;
84  import org.opencastproject.workflow.api.WorkflowInstance;
85  import org.opencastproject.workflow.api.WorkflowInstance.WorkflowState;
86  import org.opencastproject.workflow.api.WorkflowListener;
87  import org.opencastproject.workflow.api.WorkflowOperationDefinition;
88  import org.opencastproject.workflow.api.WorkflowOperationDefinitionImpl;
89  import org.opencastproject.workflow.api.WorkflowOperationHandler;
90  import org.opencastproject.workflow.api.WorkflowOperationInstance;
91  import org.opencastproject.workflow.api.WorkflowOperationInstance.OperationState;
92  import org.opencastproject.workflow.api.WorkflowOperationResult;
93  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
94  import org.opencastproject.workflow.api.WorkflowOperationResultImpl;
95  import org.opencastproject.workflow.api.WorkflowParsingException;
96  import org.opencastproject.workflow.api.WorkflowService;
97  import org.opencastproject.workflow.api.WorkflowServiceDatabase;
98  import org.opencastproject.workflow.api.WorkflowStateException;
99  import org.opencastproject.workflow.api.WorkflowStateMapping;
100 import org.opencastproject.workflow.api.WorkflowUtil;
101 import org.opencastproject.workflow.api.XmlWorkflowParser;
102 import org.opencastproject.workspace.api.Workspace;
103 
104 import com.google.common.util.concurrent.Striped;
105 
106 import org.apache.commons.io.IOUtils;
107 import org.apache.commons.lang3.StringUtils;
108 import org.apache.commons.lang3.time.DateUtils;
109 import org.osgi.framework.InvalidSyntaxException;
110 import org.osgi.framework.ServiceReference;
111 import org.osgi.service.component.ComponentContext;
112 import org.osgi.service.component.annotations.Activate;
113 import org.osgi.service.component.annotations.Component;
114 import org.osgi.service.component.annotations.Reference;
115 import org.osgi.service.component.annotations.ReferenceCardinality;
116 import org.osgi.service.component.annotations.ReferencePolicy;
117 import org.slf4j.Logger;
118 import org.slf4j.LoggerFactory;
119 
120 import java.io.ByteArrayOutputStream;
121 import java.io.IOException;
122 import java.nio.charset.StandardCharsets;
123 import java.util.ArrayList;
124 import java.util.Collections;
125 import java.util.Comparator;
126 import java.util.Date;
127 import java.util.HashMap;
128 import java.util.HashSet;
129 import java.util.List;
130 import java.util.Map;
131 import java.util.Map.Entry;
132 import java.util.Optional;
133 import java.util.Properties;
134 import java.util.Set;
135 import java.util.SortedSet;
136 import java.util.TreeSet;
137 import java.util.concurrent.Callable;
138 import java.util.concurrent.CopyOnWriteArrayList;
139 import java.util.concurrent.Executors;
140 import java.util.concurrent.ThreadPoolExecutor;
141 import java.util.concurrent.locks.Lock;
142 import java.util.function.Function;
143 import java.util.stream.Collectors;
144 
145 /**
146  * Implements WorkflowService with in-memory data structures to hold WorkflowOperations and WorkflowInstances.
147  * WorkflowOperationHandlers are looked up in the OSGi service registry based on the "workflow.operation" property. If
148  * the WorkflowOperationHandler's "workflow.operation" service registration property matches
149  * WorkflowOperation.getName(), then the factory returns a WorkflowOperationRunner to handle that operation. This allows
150  * for custom runners to be added or modified without affecting the workflow service itself.
151  */
152 @Component(
153     property = {
154       "service.description=Workflow Service",
155       "service.pid=org.opencastproject.workflow.impl.WorkflowServiceImpl"
156     },
157     immediate = true,
158     service = { WorkflowService.class, WorkflowServiceImpl.class, IndexProducer.class }
159 )
160 public class WorkflowServiceImpl extends AbstractIndexProducer implements WorkflowService, JobProducer {
161 
162   /** Retry strategy property name */
163   private static final String RETRY_STRATEGY = "retryStrategy";
164 
165   /** Logging facility */
166   private static final Logger logger = LoggerFactory.getLogger(WorkflowServiceImpl.class);
167 
168   /** List of available operations on jobs */
169   enum Operation {
170     START_WORKFLOW, RESUME, START_OPERATION
171   }
172 
173   /** Constant value indicating a <code>null</code> parent id */
174   private static final String NULL_PARENT_ID = "-";
175 
176   /** The load imposed on the system by a workflow job.
177    *  We are keeping this hardcoded because otherwise bad things will likely happen,
178    *  like an inability to process a workflow past a certain point in high-load conditions
179    */
180   private static final float WORKFLOW_JOB_LOAD = 0.0f;
181 
182   /** Error resolution handler id constant */
183   public static final String ERROR_RESOLUTION_HANDLER_ID = "error-resolution";
184 
185   /** Remove references to the component context once felix scr 1.2 becomes available */
186   protected ComponentContext componentContext = null;
187 
188   /** The metadata services */
189   private SortedSet<MediaPackageMetadataService> metadataServices;
190 
191   /** Persistent storage */
192   protected WorkflowServiceDatabase persistence;
193 
194   /** The list of workflow listeners */
195   private final List<WorkflowListener> listeners = new CopyOnWriteArrayList<WorkflowListener>();
196 
197   /** The thread pool to use for firing listeners and handling dispatched jobs */
198   protected ThreadPoolExecutor executorService;
199 
200   /** The workspace */
201   protected Workspace workspace = null;
202 
203   /** The service registry */
204   protected ServiceRegistry serviceRegistry = null;
205 
206   /** The security service */
207   protected SecurityService securityService = null;
208 
209   /** The authorization service */
210   protected AuthorizationService authorizationService = null;
211 
212   /** The user directory service */
213   protected UserDirectoryService userDirectoryService = null;
214 
215   /** The organization directory service */
216   protected OrganizationDirectoryService organizationDirectoryService = null;
217 
218   /** The series service */
219   protected SeriesService seriesService;
220 
221   /** The asset manager */
222   protected AssetManager assetManager = null;
223 
224   /** The workflow definition scanner */
225   private WorkflowDefinitionScanner workflowDefinitionScanner;
226 
227   /** List of initially delayed workflows */
228   private final List<Long> delayedWorkflows = new ArrayList<Long>();
229 
230   /** Striped locks for synchronization */
231   private final Striped<Lock> lock = Striped.lazyWeakLock(1024);
232   private final Striped<Lock> updateLock = Striped.lazyWeakLock(1024);
233   private final Striped<Lock> mediaPackageLocks = Striped.lazyWeakLock(1024);
234 
235   /** The Elasticsearch indices */
236   private ElasticsearchIndex index;
237 
238   /**
239    * Constructs a new workflow service impl, with a priority-sorted map of metadata services
240    */
241   public WorkflowServiceImpl() {
242     metadataServices = new TreeSet<>(Comparator.comparingInt(MetadataService::getPriority));
243   }
244 
245   /**
246    * Activate this service implementation via the OSGI service component runtime.
247    *
248    * @param componentContext
249    *          the component context
250    */
251   @Activate
252   public void activate(ComponentContext componentContext) {
253     this.componentContext = componentContext;
254     executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();
255     logger.info("Activate Workflow service");
256   }
257 
258   /**
259    * {@inheritDoc}
260    *
261    * @see org.opencastproject.workflow.api.WorkflowService#addWorkflowListener(
262    *      org.opencastproject.workflow.api.WorkflowListener)
263    */
264   @Override
265   public void addWorkflowListener(WorkflowListener listener) {
266     listeners.add(listener);
267   }
268 
269   /**
270    * {@inheritDoc}
271    *
272    * @see org.opencastproject.workflow.api.WorkflowService#removeWorkflowListener(
273    *      org.opencastproject.workflow.api.WorkflowListener)
274    */
275   @Override
276   public void removeWorkflowListener(WorkflowListener listener) {
277     listeners.remove(listener);
278   }
279 
280   /**
281    * Fires the workflow listeners on workflow updates.
282    */
283   protected void fireListeners(final WorkflowInstance oldWorkflowInstance, final WorkflowInstance newWorkflowInstance) {
284     final User currentUser = securityService.getUser();
285     final Organization currentOrganization = securityService.getOrganization();
286     for (final WorkflowListener listener : listeners) {
287       if (oldWorkflowInstance == null || !oldWorkflowInstance.getState().equals(newWorkflowInstance.getState())) {
288         Runnable runnable = () -> {
289           try {
290             securityService.setUser(currentUser);
291             securityService.setOrganization(currentOrganization);
292             listener.stateChanged(newWorkflowInstance);
293           } finally {
294             securityService.setUser(null);
295             securityService.setOrganization(null);
296           }
297         };
298         executorService.execute(runnable);
299       } else {
300         logger.debug("Not notifying {} because the workflow state has not changed", listener);
301       }
302 
303       if (newWorkflowInstance.getCurrentOperation() != null) {
304         if (oldWorkflowInstance == null || oldWorkflowInstance.getCurrentOperation() == null
305                 || !oldWorkflowInstance.getCurrentOperation().equals(newWorkflowInstance.getCurrentOperation())) {
306           Runnable runnable = () -> {
307             try {
308               securityService.setUser(currentUser);
309               securityService.setOrganization(currentOrganization);
310               listener.operationChanged(newWorkflowInstance);
311             } finally {
312               securityService.setUser(null);
313               securityService.setOrganization(null);
314             }
315           };
316           executorService.execute(runnable);
317         }
318       } else {
319         logger.debug("Not notifying {} because the workflow operation has not changed", listener);
320       }
321     }
322   }
323 
324   /**
325    * {@inheritDoc}
326    *
327    * @see org.opencastproject.workflow.api.WorkflowService#listAvailableWorkflowDefinitions()
328    */
329   @Override
330   public List<WorkflowDefinition> listAvailableWorkflowDefinitions() {
331     return workflowDefinitionScanner
332             .getAvailableWorkflowDefinitions(securityService.getOrganization(), securityService.getUser())
333             .sorted()
334             .collect(Collectors.toList());
335   }
336 
337   /**
338    * {@inheritDoc}
339    */
340   public boolean isRunnable(WorkflowDefinition workflowDefinition) {
341     List<String> availableOperations = listAvailableOperationNames();
342     List<WorkflowDefinition> checkedWorkflows = new ArrayList<>();
343     boolean runnable = isRunnable(workflowDefinition, availableOperations, checkedWorkflows);
344     int wfCount = checkedWorkflows.size() - 1;
345     if (runnable) {
346       logger.info("Workflow {}, containing {} derived workflows, is runnable", workflowDefinition, wfCount);
347     } else {
348       logger.warn("Workflow {}, containing {} derived workflows, is not runnable", workflowDefinition, wfCount);
349     }
350     return runnable;
351   }
352 
353   /**
354    * Tests the workflow definition for its runnability. This method is a helper for
355    * {@link #isRunnable(WorkflowDefinition)} that is suited for recursive calling.
356    *
357    * @param workflowDefinition
358    *          the definition to test
359    * @param availableOperations
360    *          list of currently available operation handlers
361    * @param checkedWorkflows
362    *          list of checked workflows, used to avoid circular checking
363    * @return <code>true</code> if all bits and pieces used for executing <code>workflowDefinition</code> are in place
364    */
365   private boolean isRunnable(WorkflowDefinition workflowDefinition, List<String> availableOperations,
366           List<WorkflowDefinition> checkedWorkflows) {
367     if (checkedWorkflows.contains(workflowDefinition)) {
368       return true;
369     }
370 
371     // Test availability of operation handler and catch workflows
372     for (WorkflowOperationDefinition op : workflowDefinition.getOperations()) {
373       if (!availableOperations.contains(op.getId())) {
374         logger.info("{} is not runnable due to missing operation {}", workflowDefinition, op);
375         return false;
376       }
377       String catchWorkflow = op.getExceptionHandlingWorkflow();
378       if (catchWorkflow != null) {
379         WorkflowDefinition catchWorkflowDefinition;
380         try {
381           catchWorkflowDefinition = getWorkflowDefinitionById(catchWorkflow);
382         } catch (NotFoundException e) {
383           logger.info("{} is not runnable due to missing catch workflow {} on operation {}", workflowDefinition,
384                   catchWorkflow, op);
385           return false;
386         }
387         if (!isRunnable(catchWorkflowDefinition, availableOperations, checkedWorkflows)) {
388           return false;
389         }
390       }
391     }
392 
393     // Add the workflow to the list of checked workflows
394     if (!checkedWorkflows.contains(workflowDefinition)) {
395       checkedWorkflows.add(workflowDefinition);
396     }
397     return true;
398   }
399 
400   /**
401    * Gets the currently registered workflow operation handlers.
402    *
403    * @return All currently registered handlers
404    */
405   public Set<HandlerRegistration> getRegisteredHandlers() {
406     Set<HandlerRegistration> set = new HashSet<>();
407     ServiceReference[] refs;
408     try {
409       refs = componentContext.getBundleContext().getServiceReferences(WorkflowOperationHandler.class.getName(), null);
410     } catch (InvalidSyntaxException e) {
411       throw new IllegalStateException(e);
412     }
413     if (refs != null) {
414       for (ServiceReference ref : refs) {
415         WorkflowOperationHandler handler = (WorkflowOperationHandler) componentContext.getBundleContext().getService(
416                 ref);
417         set.add(new HandlerRegistration((String) ref.getProperty(WORKFLOW_OPERATION_PROPERTY), handler));
418       }
419     } else {
420       logger.warn("No registered workflow operation handlers found");
421     }
422     return set;
423   }
424 
425   protected WorkflowOperationHandler getWorkflowOperationHandler(String operationId) {
426     for (HandlerRegistration reg : getRegisteredHandlers()) {
427       if (reg.operationName.equals(operationId)) {
428         return reg.handler;
429       }
430     }
431     return null;
432   }
433 
434   /**
435    * Lists the names of each workflow operation. Operation names are availalbe for use if there is a registered
436    * {@link WorkflowOperationHandler} with an equal {@link WorkflowServiceImpl#WORKFLOW_OPERATION_PROPERTY} property.
437    *
438    * @return The {@link List} of available workflow operation names
439    */
440   protected List<String> listAvailableOperationNames() {
441     return getRegisteredHandlers().parallelStream().map(op -> op.operationName).collect(Collectors.toList());
442   }
443 
444   /**
445    * {@inheritDoc}
446    *
447    * @see org.opencastproject.workflow.api.WorkflowService#getWorkflowById(long)
448    */
449   @Override
450   public WorkflowInstance getWorkflowById(long id) throws NotFoundException,
451           UnauthorizedException {
452     try {
453       WorkflowInstance workflow = persistence.getWorkflow(id);
454       assertPermission(workflow, Permissions.Action.READ.toString(), workflow.getOrganizationId());
455       return workflow;
456     } catch (WorkflowDatabaseException e) {
457       throw new IllegalStateException("Got not get workflow from database with id ");
458     }
459   }
460 
461   /**
462    * {@inheritDoc}
463    *
464    * @see org.opencastproject.workflow.api.WorkflowService#start(org.opencastproject.workflow.api.WorkflowDefinition,
465    *      org.opencastproject.mediapackage.MediaPackage)
466    */
467   @Override
468   public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage)
469           throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
470     return start(workflowDefinition, mediaPackage, new HashMap<>());
471   }
472 
473   /**
474    * {@inheritDoc}
475    *
476    * @see org.opencastproject.workflow.api.WorkflowService#start(org.opencastproject.workflow.api.WorkflowDefinition,
477    *      org.opencastproject.mediapackage.MediaPackage)
478    */
479   @Override
480   public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage,
481           Map<String, String> properties)
482           throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
483     try {
484       return start(workflowDefinition, mediaPackage, null, properties);
485     } catch (NotFoundException e) {
486       // should never happen
487       throw new IllegalStateException("a null workflow ID caused a NotFoundException.  This is a programming error.");
488     }
489   }
490 
491   /**
492    * {@inheritDoc}
493    *
494    * @see org.opencastproject.workflow.api.WorkflowService#start(org.opencastproject.workflow.api.WorkflowDefinition,
495    *      org.opencastproject.mediapackage.MediaPackage, Long, java.util.Map)
496    */
497   @Override
498   public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage sourceMediaPackage,
499           Long parentWorkflowId, Map<String, String> originalProperties) throws WorkflowDatabaseException,
500           NotFoundException, UnauthorizedException, WorkflowParsingException, IllegalStateException {
501     final String mediaPackageId = sourceMediaPackage.getIdentifier().toString();
502     Map<String, String> properties = null;
503 
504     // WorkflowPropertiesUtil.storeProperties will take a snapshot if there isn't one
505     // and we want the mp in the snapshot to have all the metadata populated.
506     populateMediaPackageMetadata(sourceMediaPackage);
507 
508     if (originalProperties != null) {
509       WorkflowPropertiesUtil.storeProperties(assetManager, sourceMediaPackage, originalProperties);
510       properties = WorkflowPropertiesUtil.getLatestWorkflowProperties(assetManager, mediaPackageId);
511     }
512 
513     // We have to synchronize per media package to avoid starting multiple simultaneous workflows for one media package.
514     final Lock lock = mediaPackageLocks.get(mediaPackageId);
515     lock.lock();
516 
517     try {
518       if (workflowDefinition == null) {
519         throw new IllegalArgumentException("workflow definition must not be null");
520       }
521       Optional<List<String>> errors = MediaPackageSupport.sanityCheck(sourceMediaPackage);
522       if (errors.isPresent()) {
523         throw new IllegalArgumentException("Insane media package cannot be processed: "
524             + String.join("; ", errors.get()));
525       }
526       if (parentWorkflowId != null) {
527         try {
528           getWorkflowById(parentWorkflowId); // Let NotFoundException bubble up
529         } catch (UnauthorizedException e) {
530           throw new IllegalArgumentException("Parent workflow " + parentWorkflowId + " not visible to this user");
531         }
532       } else {
533         if (persistence.mediaPackageHasActiveWorkflows(mediaPackageId)) {
534           throw new IllegalStateException(String.format(
535                   "Can't start workflow '%s' for media package '%s' because another workflow is currently active.",
536                   workflowDefinition.getTitle(),
537                   sourceMediaPackage.getIdentifier().toString()));
538         }
539       }
540 
541       // Get the current user
542       User currentUser = securityService.getUser();
543       validUserOrThrow(currentUser);
544 
545       // Get the current organization
546       Organization organization = securityService.getOrganization();
547       if (organization == null) {
548         throw new SecurityException("Current organization is unknown");
549       }
550 
551       WorkflowInstance workflowInstance = new WorkflowInstance(workflowDefinition, sourceMediaPackage,
552               currentUser, organization, properties);
553       workflowInstance = updateConfiguration(workflowInstance, properties);
554 
555       // Create and configure the workflow instance
556       try {
557         // Create a new job for this workflow instance
558         String workflowDefinitionXml = XmlWorkflowParser.toXml(workflowDefinition);
559         String mediaPackageXml = MediaPackageParser.getAsXml(sourceMediaPackage);
560 
561         List<String> arguments = new ArrayList<>();
562         arguments.add(workflowDefinitionXml);
563         arguments.add(mediaPackageXml);
564         if (parentWorkflowId != null || properties != null) {
565           String parentWorkflowIdString = (parentWorkflowId != null) ? parentWorkflowId.toString() : NULL_PARENT_ID;
566           arguments.add(parentWorkflowIdString);
567         }
568         if (properties != null) {
569           arguments.add(mapToString(properties));
570         }
571 
572         Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_WORKFLOW.toString(), arguments,
573                 null, false, null, WORKFLOW_JOB_LOAD);
574 
575         // Have the workflow take on the job's identity
576         workflowInstance.setId(job.getId());
577 
578         // Add the workflow to the search index and have the job enqueued for dispatch.
579         // Update also sets ACL and mediapackage metadata
580         update(workflowInstance);
581 
582         return workflowInstance;
583       } catch (Throwable t) {
584         try {
585           workflowInstance.setState(FAILED);
586           update(workflowInstance);
587         } catch (Exception failureToFail) {
588           logger.warn("Unable to update workflow to failed state", failureToFail);
589         }
590         try {
591           throw t;
592         } catch (ServiceRegistryException e) {
593           throw new WorkflowDatabaseException(e);
594         }
595       }
596     } finally {
597       lock.unlock();
598     }
599   }
600 
601   protected WorkflowInstance updateConfiguration(WorkflowInstance instance, Map<String, String> properties) {
602     if (properties != null) {
603       for (Entry<String, String> entry : properties.entrySet()) {
604         instance.setConfiguration(entry.getKey(), entry.getValue());
605       }
606     }
607     return instance;
608   }
609 
610   /**
611    * Does a lookup of available operation handlers for the given workflow operation.
612    *
613    * @param operation
614    *          the operation definition
615    * @return the handler or <code>null</code>
616    */
617   protected WorkflowOperationHandler selectOperationHandler(WorkflowOperationInstance operation) {
618     List<WorkflowOperationHandler> handlerList = new ArrayList<>();
619     for (HandlerRegistration handlerReg : getRegisteredHandlers()) {
620       if (handlerReg.operationName != null && handlerReg.operationName.equals(operation.getTemplate())) {
621         handlerList.add(handlerReg.handler);
622       }
623     }
624     if (handlerList.size() > 1) {
625       throw new IllegalStateException("Multiple operation handlers found for operation '" + operation.getTemplate()
626               + "'");
627     } else if (handlerList.size() == 1) {
628       return handlerList.get(0);
629     }
630     logger.warn("No workflow operation handlers found for operation '{}'", operation.getTemplate());
631     return null;
632   }
633 
634   /**
635    * Executes the workflow.
636    *
637    * @param workflow
638    *          the workflow instance
639    * @throws WorkflowException
640    *           if there is a problem processing the workflow
641    * @throws UnauthorizedException
642    */
643   protected Job runWorkflow(WorkflowInstance workflow) throws WorkflowException, UnauthorizedException {
644     if (INSTANTIATED != workflow.getState()) {
645 
646       // If the workflow is "running", we need to determine if there is an operation being executed or not.
647       // When a workflow has been restarted, this might not be the case and the status might not have been
648       // updated accordingly.
649       if (RUNNING == workflow.getState()) {
650         WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
651         if (currentOperation != null) {
652           if (currentOperation.getId() != null) {
653             try {
654               Job operationJob = serviceRegistry.getJob(currentOperation.getId());
655               if (Job.Status.RUNNING.equals(operationJob.getStatus())) {
656                 logger.debug("Not starting workflow {}, it is already in running state", workflow);
657                 return null;
658               } else {
659                 logger.info("Scheduling next operation of workflow {}", workflow);
660                 operationJob.setStatus(Status.QUEUED);
661                 operationJob.setDispatchable(true);
662                 return serviceRegistry.updateJob(operationJob);
663               }
664             } catch (Exception e) {
665               logger.warn("Error determining status of current workflow operation in {}", workflow, e);
666               return null;
667             }
668           }
669         } else {
670           throw new IllegalStateException("Cannot start a workflow '" + workflow + "' with no current operation");
671         }
672       } else {
673         throw new IllegalStateException("Cannot start a workflow in state '" + workflow.getState() + "'");
674       }
675     }
676 
677     // If this is a new workflow, move to the first operation
678     workflow.setState(RUNNING);
679     update(workflow);
680 
681     WorkflowOperationInstance operation = workflow.getCurrentOperation();
682 
683     if (operation == null) {
684       throw new IllegalStateException("Cannot start a workflow without a current operation");
685     }
686 
687     if (!operation.equals(workflow.getOperations().get(0))) {
688       throw new IllegalStateException("Current operation expected to be first");
689     }
690 
691     try {
692       logger.info("Scheduling workflow {} for execution", workflow.getId());
693       Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
694               Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
695       operation.setId(job.getId());
696       update(workflow);
697       job.setStatus(Status.QUEUED);
698       job.setDispatchable(true);
699       return serviceRegistry.updateJob(job);
700     } catch (ServiceRegistryException e) {
701       throw new WorkflowDatabaseException(e);
702     } catch (NotFoundException e) {
703       // this should be impossible
704       throw new IllegalStateException("Unable to find a job that was just created");
705     }
706 
707   }
708 
709   /**
710    * Executes the workflow's current operation.
711    *
712    * @param workflow
713    *          the workflow
714    * @param properties
715    *          the properties that are passed in on resume
716    * @return the processed workflow operation
717    * @throws WorkflowException
718    *           if there is a problem processing the workflow
719    * @throws UnauthorizedException
720    */
721   protected WorkflowOperationInstance runWorkflowOperation(WorkflowInstance workflow, Map<String, String> properties)
722           throws WorkflowException, UnauthorizedException {
723     WorkflowOperationInstance processingOperation = workflow.getCurrentOperation();
724     if (processingOperation == null) {
725       throw new IllegalStateException("Workflow '" + workflow + "' has no operation to run");
726     }
727 
728     // Keep the current state for later reference, it might have been changed from the outside
729     WorkflowState initialState = workflow.getState();
730 
731     // Execute the operation handler
732     WorkflowOperationHandler operationHandler = selectOperationHandler(processingOperation);
733     WorkflowOperationWorker worker = new WorkflowOperationWorker(operationHandler, workflow, properties, this);
734     workflow = worker.execute();
735 
736     Long currentOperationJobId = processingOperation.getId();
737     try {
738       updateOperationJob(currentOperationJobId, processingOperation.getState());
739     } catch (NotFoundException e) {
740       throw new IllegalStateException("Unable to find a job that has already been running");
741     } catch (ServiceRegistryException e) {
742       throw new WorkflowDatabaseException(e);
743     }
744 
745     // Move on to the next workflow operation
746     WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
747 
748     // Is the workflow done?
749     if (currentOperation == null) {
750 
751       // If we are in failing mode, we were simply working off an error handling workflow
752       if (FAILING.equals(workflow.getState())) {
753         workflow.setState(FAILED);
754       }
755 
756       // Otherwise, let's make sure we didn't miss any failed operation, since the workflow state could have been
757       // switched to paused while processing the error handling workflow extension
758       else if (!FAILED.equals(workflow.getState())) {
759         workflow.setState(SUCCEEDED);
760         for (WorkflowOperationInstance op : workflow.getOperations()) {
761           if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
762             if (op.isFailOnError()) {
763               workflow.setState(FAILED);
764               break;
765             }
766           }
767         }
768       }
769 
770       // Save the updated workflow to the database
771       logger.debug("{} has {}", workflow, workflow.getState());
772       update(workflow);
773 
774     } else {
775 
776       // Somebody might have set the workflow to "paused" from the outside, so take a look a the database first
777       WorkflowState dbWorkflowState;
778       try {
779         dbWorkflowState = getWorkflowById(workflow.getId()).getState();
780       } catch (NotFoundException e) {
781         throw new IllegalStateException("The workflow with ID " + workflow.getId()
782                 + " can not be found in the database", e);
783       } catch (UnauthorizedException e) {
784         throw new IllegalStateException("The workflow with ID " + workflow.getId() + " can not be read", e);
785       }
786 
787       // If somebody changed the workflow state from the outside, that state should take precedence
788       if (!dbWorkflowState.equals(initialState)) {
789         logger.info("Workflow state for {} was changed to '{}' from the outside", workflow, dbWorkflowState);
790         workflow.setState(dbWorkflowState);
791       }
792 
793       // Save the updated workflow to the database
794 
795       Job job;
796       switch (workflow.getState()) {
797         case FAILED:
798           update(workflow);
799           break;
800         case FAILING:
801         case RUNNING:
802           try {
803             job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
804                     Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
805             currentOperation.setId(job.getId());
806             update(workflow);
807             job.setStatus(Status.QUEUED);
808             job.setDispatchable(true);
809             serviceRegistry.updateJob(job);
810           } catch (ServiceRegistryException e) {
811             throw new WorkflowDatabaseException(e);
812           } catch (NotFoundException e) {
813             // this should be impossible
814             throw new IllegalStateException("Unable to find a job that was just created");
815           }
816           break;
817         case PAUSED:
818         case STOPPED:
819         case SUCCEEDED:
820           update(workflow);
821           break;
822         case INSTANTIATED:
823           update(workflow);
824           throw new IllegalStateException("Impossible workflow state found during processing");
825         default:
826           throw new IllegalStateException("Unknown workflow state found during processing");
827       }
828 
829     }
830 
831     return processingOperation;
832   }
833 
834   /**
835    * {@inheritDoc}
836    *
837    * @see org.opencastproject.workflow.api.WorkflowService#getWorkflowDefinitionById(String)
838    */
839   @Override
840   public WorkflowDefinition getWorkflowDefinitionById(String id) throws NotFoundException {
841     final WorkflowIdentifier workflowIdentifier = new WorkflowIdentifier(id, securityService.getOrganization().getId());
842     final WorkflowDefinition def = workflowDefinitionScanner
843             .getWorkflowDefinition(securityService.getUser(), workflowIdentifier);
844     if (def == null) {
845       throw new NotFoundException("Workflow definition '" + workflowIdentifier + "' not found or inaccessible");
846     }
847     return def;
848   }
849 
850   /**
851    * {@inheritDoc}
852    *
853    * @see org.opencastproject.workflow.api.WorkflowService#stop(long)
854    */
855   @Override
856   public WorkflowInstance stop(long workflowInstanceId) throws WorkflowException, NotFoundException,
857           UnauthorizedException {
858     final Lock lock = this.lock.get(workflowInstanceId);
859     lock.lock();
860     try {
861       WorkflowInstance instance = getWorkflowById(workflowInstanceId);
862 
863       if (instance.getState() != STOPPED) {
864         // Update the workflow instance
865         instance.setState(STOPPED);
866         update(instance);
867       }
868 
869       try {
870         removeTempFiles(instance);
871       } catch (Exception e) {
872         logger.warn("Cannot remove temp files for workflow instance {}", workflowInstanceId, e);
873       }
874 
875       return instance;
876     } finally {
877       lock.unlock();
878     }
879   }
880 
881   /**
882    * Checks whether user is set and is known to the userDirectoryService
883    */
884   private void validUserOrThrow(User user) {
885     if (user == null) {
886       throw new SecurityException("Current user is unknown");
887     }
888 
889     if (userDirectoryService.loadUser(user.getUsername()) == null) {
890       throw new SecurityException(String.format("Current user '%s' can not be loaded", user.getUsername()));
891     }
892   }
893 
894   private void removeTempFiles(WorkflowInstance workflowInstance) {
895     logger.info("Removing temporary files for workflow {}", workflowInstance.getId());
896     MediaPackage mp = workflowInstance.getMediaPackage();
897     if (null == mp) {
898       logger.warn("Workflow instance {} does not have an media package set", workflowInstance.getId());
899       return;
900     }
901     for (MediaPackageElement elem : mp.getElements()) {
902       // Publications should not link to temporary files and can be skipped
903       if (elem instanceof Publication) {
904         continue;
905       }
906       if (null == elem.getURI()) {
907         logger.warn("Media package element {} from the media package {} does not have an URI set",
908                 elem.getIdentifier(), mp.getIdentifier());
909         continue;
910       }
911       try {
912         logger.debug("Removing temporary file {} for workflow {}", elem.getURI(), workflowInstance);
913         workspace.delete(elem.getURI());
914       } catch (IOException e) {
915         logger.warn("Unable to delete mediapackage element", e);
916       } catch (NotFoundException e) {
917         // File was probably already deleted before...
918       }
919     }
920   }
921 
922 
923   /**
924    * {@inheritDoc}
925    *
926    * @see org.opencastproject.workflow.api.WorkflowService#remove(long)
927    */
928   @Override
929   public void remove(long workflowInstanceId) throws WorkflowDatabaseException, NotFoundException,
930           UnauthorizedException, WorkflowParsingException, WorkflowStateException {
931     remove(workflowInstanceId, false);
932   }
933 
934   /**
935    * {@inheritDoc}
936    *
937    * @see org.opencastproject.workflow.api.WorkflowService#remove(long,boolean)
938    */
939   @Override
940   public void remove(long workflowInstanceId, boolean force) throws WorkflowDatabaseException, NotFoundException,
941           UnauthorizedException, WorkflowStateException {
942     final Lock lock = this.lock.get(workflowInstanceId);
943     lock.lock();
944     try {
945       WorkflowInstance instance = getWorkflowById(workflowInstanceId);
946       WorkflowInstance.WorkflowState state = instance.getState();
947       if (!state.isTerminated() && !force) {
948         throw new WorkflowStateException("Workflow instance with state '" + state + "' cannot be removed "
949             + "since it is not yet terminated.");
950       }
951 
952       assertPermission(instance, Permissions.Action.WRITE.toString(), instance.getOrganizationId());
953 
954       // First, remove temporary files
955       removeTempFiles(instance);
956 
957       // Second, remove jobs related to operations which belong to the workflow instance
958       List<WorkflowOperationInstance> operations = instance.getOperations();
959       List<Long> jobsToDelete = new ArrayList<>();
960       for (WorkflowOperationInstance op : operations) {
961         if (op.getId() != null) {
962           long workflowOpId = op.getId();
963           if (workflowOpId != workflowInstanceId) {
964             jobsToDelete.add(workflowOpId);
965           }
966         }
967       }
968       try {
969         serviceRegistry.removeJobs(jobsToDelete);
970       } catch (ServiceRegistryException e) {
971         logger.warn("Problems while removing jobs related to workflow operations '{}'", jobsToDelete, e);
972       } catch (NotFoundException e) {
973         logger.debug("No jobs related to one of the workflow operation '{}' found in service registry", jobsToDelete);
974       }
975 
976       // Third, remove workflow instance job itself
977       try {
978         serviceRegistry.removeJobs(Collections.singletonList(workflowInstanceId));
979         removeWorkflowInstanceFromIndex(instance.getId());
980       } catch (ServiceRegistryException e) {
981         logger.warn("Problems while removing workflow instance job '{}'", workflowInstanceId, e);
982       } catch (NotFoundException e) {
983         logger.info("No workflow instance job '{}' found in the service registry", workflowInstanceId);
984       }
985 
986       // Remove workflow from database
987       persistence.removeFromDatabase(instance);
988     } finally {
989       lock.unlock();
990     }
991   }
992 
993   /**
994    * {@inheritDoc}
995    *
996    * @see org.opencastproject.workflow.api.WorkflowService#suspend(long)
997    */
998   @Override
999   public WorkflowInstance suspend(long workflowInstanceId) throws WorkflowException, NotFoundException,
1000           UnauthorizedException {
1001     final Lock lock = this.lock.get(workflowInstanceId);
1002     lock.lock();
1003     try {
1004       WorkflowInstance instance = getWorkflowById(workflowInstanceId);
1005       instance.setState(PAUSED);
1006       update(instance);
1007       return instance;
1008     } finally {
1009       lock.unlock();
1010     }
1011   }
1012 
1013   /**
1014    * {@inheritDoc}
1015    *
1016    * @see org.opencastproject.workflow.api.WorkflowService#resume(long)
1017    */
1018   @Override
1019   public WorkflowInstance resume(long id) throws WorkflowException, NotFoundException, IllegalStateException,
1020           UnauthorizedException {
1021     return resume(id, null);
1022   }
1023 
1024   /**
1025    * {@inheritDoc}
1026    *
1027    * @see org.opencastproject.workflow.api.WorkflowService#resume(long, Map)
1028    */
1029   @Override
1030   public WorkflowInstance resume(long workflowInstanceId, Map<String, String> properties) throws WorkflowException,
1031           NotFoundException, IllegalStateException, UnauthorizedException {
1032     WorkflowInstance workflowInstance = getWorkflowById(workflowInstanceId);
1033     if (!WorkflowState.PAUSED.equals(workflowInstance.getState())) {
1034       throw new IllegalStateException("Can not resume a workflow where the current state is not in paused");
1035     }
1036 
1037     workflowInstance = updateConfiguration(workflowInstance, properties);
1038     update(workflowInstance);
1039 
1040     WorkflowOperationInstance currentOperation = workflowInstance.getCurrentOperation();
1041 
1042     // Is the workflow done?
1043     if (currentOperation == null) {
1044       // Let's make sure we didn't miss any failed operation, since the workflow state could have been
1045       // switched to paused while processing the error handling workflow extension
1046       workflowInstance.setState(SUCCEEDED);
1047       for (WorkflowOperationInstance op : workflowInstance.getOperations()) {
1048         if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
1049           if (op.isFailOnError()) {
1050             workflowInstance.setState(FAILED);
1051             break;
1052           }
1053         }
1054       }
1055 
1056       // Save the resumed workflow to the database
1057       logger.debug("{} has {}", workflowInstance, workflowInstance.getState());
1058       update(workflowInstance);
1059       return workflowInstance;
1060     }
1061 
1062     // We can resume workflows when they are in either the paused state, or they are being advanced manually passed
1063     // certain operations. In the latter case, there is no current paused operation.
1064     if (OperationState.INSTANTIATED.equals(currentOperation.getState())) {
1065       try {
1066         // the operation has its own job. Update that too.
1067         Job operationJob = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
1068                 Collections.singletonList(Long.toString(workflowInstanceId)), null, false, null, WORKFLOW_JOB_LOAD);
1069 
1070         // this method call is publicly visible, so it doesn't necessarily go through the accept method. Set the
1071         // workflow state manually.
1072         workflowInstance.setState(RUNNING);
1073         currentOperation.setId(operationJob.getId());
1074 
1075         // update the workflow and its associated job
1076         update(workflowInstance);
1077 
1078         // Now set this job to be queued so it can be dispatched
1079         operationJob.setStatus(Status.QUEUED);
1080         operationJob.setDispatchable(true);
1081         serviceRegistry.updateJob(operationJob);
1082 
1083         return workflowInstance;
1084       } catch (ServiceRegistryException e) {
1085         throw new WorkflowDatabaseException(e);
1086       }
1087     }
1088 
1089     Long operationJobId = workflowInstance.getCurrentOperation().getId();
1090     if (operationJobId == null) {
1091       throw new IllegalStateException("Can not resume a workflow where the current operation has no associated id");
1092     }
1093 
1094     // Set the current operation's job to queued, so it gets picked up again
1095     Job workflowJob;
1096     try {
1097       workflowJob = serviceRegistry.getJob(workflowInstanceId);
1098       workflowJob.setStatus(Status.RUNNING);
1099       persistence.updateInDatabase(workflowInstance);
1100       serviceRegistry.updateJob(workflowJob);
1101 
1102       Job operationJob = serviceRegistry.getJob(operationJobId);
1103       operationJob.setStatus(Status.QUEUED);
1104       operationJob.setDispatchable(true);
1105       if (properties != null) {
1106         Properties props = new Properties();
1107         props.putAll(properties);
1108         ByteArrayOutputStream out = new ByteArrayOutputStream();
1109         props.store(out, null);
1110         List<String> newArguments = new ArrayList<String>(operationJob.getArguments());
1111         newArguments.add(new String(out.toByteArray(), StandardCharsets.UTF_8));
1112         operationJob.setArguments(newArguments);
1113       }
1114       serviceRegistry.updateJob(operationJob);
1115     } catch (ServiceRegistryException e) {
1116       throw new WorkflowDatabaseException(e);
1117     } catch (IOException e) {
1118       throw new WorkflowParsingException("Unable to parse workflow and/or workflow properties");
1119     }
1120 
1121     return workflowInstance;
1122   }
1123 
1124   /**
1125    * Asserts that the current user has permission to take the provided action on a workflow instance.
1126    *
1127    * @param workflow
1128    *          the workflow instance
1129    * @param action
1130    *          the action to ensure is permitted
1131    * @throws UnauthorizedException
1132    *           if the action is not authorized
1133    */
1134   protected void assertPermission(WorkflowInstance workflow, String action, String workflowOrgId)
1135           throws UnauthorizedException {
1136     User currentUser = securityService.getUser();
1137     Organization currentOrg = securityService.getOrganization();
1138     String currentOrgAdminRole = currentOrg.getAdminRole();
1139     String currentOrgId = currentOrg.getId();
1140 
1141     MediaPackage mediapackage = workflow.getMediaPackage();
1142 
1143     WorkflowState state = workflow.getState();
1144     if (state != INSTANTIATED && state != RUNNING && workflow.getState() != FAILING) {
1145       Optional<MediaPackage> assetMediapackage = assetManager.getMediaPackage(mediapackage.getIdentifier().toString());
1146       if (assetMediapackage.isPresent()) {
1147         mediapackage = assetMediapackage.get();
1148       }
1149     }
1150 
1151     var creatorName = workflow.getCreatorName();
1152     var workflowCreator = creatorName == null ? null : userDirectoryService.loadUser(creatorName);
1153     boolean authorized = currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1154             || (currentUser.hasRole(currentOrgAdminRole) && currentOrgId.equals(workflowOrgId))
1155             || (currentUser.equals(workflowCreator))
1156             || (authorizationService.hasPermission(mediapackage, action) && currentOrgId.equals(workflowOrgId));
1157 
1158     if (!authorized) {
1159       throw new UnauthorizedException(currentUser, action);
1160     }
1161   }
1162 
1163   protected boolean assertMediaPackagePermission(String mediaPackageId, String action) {
1164     var currentUser = securityService.getUser();
1165     Optional<MediaPackage> mp = assetManager.getMediaPackage(mediaPackageId);
1166 
1167     // asset manager already checks if user is admin, org admin for same org as mp, or has explicit read rights
1168     // global admins can still get workflow instances if mp is gone from asset manager
1169     // org admins can't because then we don't know if mp belonged to same org as user
1170     return currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1171             || mp.isPresent() && currentUser.hasRole(securityService.getOrganization().getAdminRole())
1172             || mp.isPresent() && authorizationService.hasPermission(mp.get(), action);
1173   }
1174 
1175   /**
1176    * {@inheritDoc}
1177    *
1178    * @see org.opencastproject.workflow.api.WorkflowService#update(org.opencastproject.workflow.api.WorkflowInstance)
1179    */
1180   @Override
1181   public void update(final WorkflowInstance workflowInstance) throws WorkflowDatabaseException, UnauthorizedException {
1182     final Lock lock = updateLock.get(workflowInstance.getId());
1183     lock.lock();
1184 
1185     try {
1186       WorkflowInstance originalWorkflowInstance = null;
1187       try {
1188         // get workflow and assert permissions
1189         originalWorkflowInstance = getWorkflowById(workflowInstance.getId());
1190       } catch (NotFoundException e) {
1191         // That's fine, it's a new workflow instance
1192       }
1193 
1194       MediaPackage updatedMediaPackage = null;
1195       try {
1196 
1197         // Before we persist this, extract the metadata
1198         updatedMediaPackage = workflowInstance.getMediaPackage();
1199 
1200         populateMediaPackageMetadata(updatedMediaPackage);
1201 
1202         String seriesId = updatedMediaPackage.getSeries();
1203         if (seriesId != null && workflowInstance.getCurrentOperation() != null) {
1204           // If the mediapackage contains a series, find the series ACLs and add the security information to the
1205           // mediapackage
1206 
1207           try {
1208             AccessControlList acl = seriesService.getSeriesAccessControl(seriesId);
1209             Tuple<AccessControlList, AclScope> activeAcl = authorizationService.getAcl(
1210                 updatedMediaPackage, AclScope.Series);
1211             // Update series ACL if it differs from the active series ACL on the media package
1212             if (!AclScope.Series.equals(activeAcl.getB()) || !AccessControlUtil.equals(activeAcl.getA(), acl)) {
1213               authorizationService.setAcl(updatedMediaPackage, AclScope.Series, acl);
1214             }
1215           } catch (NotFoundException e) {
1216             logger.debug("Not updating series ACL on event {} since series {} has no ACL set",
1217                 updatedMediaPackage, seriesId, e);
1218           }
1219         }
1220 
1221         workflowInstance.setMediaPackage(updatedMediaPackage);
1222       } catch (SeriesException e) {
1223         throw new WorkflowDatabaseException(e);
1224       } catch (Exception e) {
1225         logger.error("Metadata for media package {} could not be updated", updatedMediaPackage, e);
1226       }
1227 
1228       // Synchronize the job status with the workflow
1229       WorkflowState workflowState = workflowInstance.getState();
1230 
1231       Job job;
1232       try {
1233         job = serviceRegistry.getJob(workflowInstance.getId());
1234         job.setPayload(Long.toString(workflowInstance.getId()));
1235 
1236         // Synchronize workflow and job state
1237         switch (workflowState) {
1238           case FAILED:
1239             job.setStatus(Status.FAILED);
1240             break;
1241           case FAILING:
1242             break;
1243           case INSTANTIATED:
1244             job.setDispatchable(true);
1245             job.setStatus(Status.QUEUED);
1246             break;
1247           case PAUSED:
1248             job.setStatus(Status.PAUSED);
1249             break;
1250           case RUNNING:
1251             job.setStatus(Status.RUNNING);
1252             break;
1253           case STOPPED:
1254             job.setStatus(Status.CANCELLED);
1255             break;
1256           case SUCCEEDED:
1257             job.setStatus(Status.FINISHED);
1258             break;
1259           default:
1260             throw new IllegalStateException("Found a workflow state that is not handled");
1261         }
1262       } catch (ServiceRegistryException e) {
1263         throw new WorkflowDatabaseException(
1264             "Unable to read workflow job " + workflowInstance.getId() + " from service registry", e);
1265       } catch (NotFoundException e) {
1266         throw new WorkflowDatabaseException(
1267             "Job for workflow " + workflowInstance.getId() + " not found in service registry", e);
1268       }
1269 
1270       // Update both workflow and workflow job
1271       try {
1272         //Update the database
1273         persistence.updateInDatabase(workflowInstance);
1274 
1275         job = serviceRegistry.updateJob(job);
1276 
1277         WorkflowOperationInstance op = workflowInstance.getCurrentOperation();
1278 
1279         // Update index used for UI. Note that we only need certain metadata and we can safely filter out workflow
1280         // updates for running operations since we updated the metadata right before these operations and will do so
1281         // again right after those operations.
1282         if (op == null || op.getState() != OperationState.RUNNING) {
1283           // Collect necessary information only for index update
1284           long id = workflowInstance.getId();
1285           int state = workflowInstance.getState().ordinal();
1286           var template = workflowInstance.getTemplate();
1287           String mpId = workflowInstance.getMediaPackage().getIdentifier().toString();
1288           String orgId = workflowInstance.getOrganizationId();
1289 
1290           updateWorkflowInstanceInIndex(id, state, template, mpId, orgId);
1291         }
1292       } catch (ServiceRegistryException e) {
1293         throw new WorkflowDatabaseException("Update of workflow job " + workflowInstance.getId()
1294             + " in the service registry failed, service registry and workflow table may be out of sync", e);
1295       } catch (NotFoundException e) {
1296         throw new WorkflowDatabaseException("Job for workflow " + workflowInstance.getId()
1297             + " not found in service registry", e);
1298       } catch (Exception e) {
1299         throw new WorkflowDatabaseException("Update of workflow job " + job.getId()
1300             + " in the service registry failed, service registry and workflow table may be out of sync", e);
1301       }
1302 
1303       try {
1304         fireListeners(originalWorkflowInstance, workflowInstance);
1305       } catch (Exception e) {
1306         // Can't happen, since we are converting from an in-memory object
1307         throw new IllegalStateException("In-memory workflow instance could not be serialized", e);
1308       }
1309     } finally {
1310       lock.unlock();
1311     }
1312   }
1313 
1314   /**
1315    * {@inheritDoc}
1316    *
1317    * @see org.opencastproject.workflow.api.WorkflowService#countWorkflowInstances()
1318    */
1319   @Override
1320   public long countWorkflowInstances() throws WorkflowDatabaseException {
1321     return countWorkflowInstances(null);
1322   }
1323 
1324   /**
1325    * {@inheritDoc}
1326    *
1327    * @see org.opencastproject.workflow.api.WorkflowService#countWorkflowInstances(
1328    *      org.opencastproject.workflow.api.WorkflowInstance.WorkflowState)
1329    */
1330   @Override
1331   public long countWorkflowInstances(WorkflowState state) throws WorkflowDatabaseException {
1332     return persistence.countWorkflows(state);
1333   }
1334 
1335   /**
1336    * {@inheritDoc}
1337    *
1338    * @see org.opencastproject.workflow.api.WorkflowService#getWorkflowInstancesByMediaPackage(String)
1339    */
1340   @Override
1341   public List<WorkflowInstance> getWorkflowInstancesByMediaPackage(String mediaPackageId)
1342           throws WorkflowDatabaseException, UnauthorizedException {
1343     // If we have read permission to the media package, return all workflows
1344     if (!assertMediaPackagePermission(mediaPackageId, Permissions.Action.READ.toString())) {
1345       throw new UnauthorizedException("Not allowed to access event");
1346     }
1347     return persistence.getWorkflowInstancesByMediaPackage(mediaPackageId);
1348   }
1349 
1350   /**
1351    * {@inheritDoc}
1352    *
1353    * @see org.opencastproject.workflow.api.WorkflowService#getRunningWorkflowInstanceByMediaPackage(String, String)
1354    */
1355   @Override
1356   public Optional<WorkflowInstance> getRunningWorkflowInstanceByMediaPackage(String mediaPackageId, String action)
1357           throws WorkflowException, UnauthorizedException, WorkflowDatabaseException {
1358     List<WorkflowInstance> workflowInstances = persistence.getRunningWorkflowInstancesByMediaPackage(mediaPackageId);
1359 
1360     // If there is more than workflow running something is very wrong
1361     if (workflowInstances.size() > 1) {
1362       throw new WorkflowException("Multiple workflows are active on mediapackage " + mediaPackageId);
1363     }
1364 
1365     Optional<WorkflowInstance> optWorkflowInstance = Optional.empty();
1366     if (workflowInstances.size() == 1) {
1367       WorkflowInstance wfInstance = workflowInstances.get(0);
1368       optWorkflowInstance = Optional.of(wfInstance);
1369       assertPermission(wfInstance, action, wfInstance.getOrganizationId());
1370     }
1371 
1372     return optWorkflowInstance;
1373   }
1374 
1375   /**
1376    * {@inheritDoc}
1377    *
1378    * @see org.opencastproject.workflow.api.WorkflowService#mediaPackageHasActiveWorkflows(String)
1379    */
1380   @Override
1381   public boolean mediaPackageHasActiveWorkflows(String mediaPackageId) throws WorkflowDatabaseException {
1382     return persistence.mediaPackageHasActiveWorkflows(mediaPackageId);
1383   }
1384 
1385   /**
1386    * {@inheritDoc}
1387    *
1388    * @see org.opencastproject.workflow.api.WorkflowService#userHasActiveWorkflows(String)
1389    */
1390   @Override
1391   public boolean userHasActiveWorkflows(String userId) throws WorkflowDatabaseException {
1392     return persistence.userHasActiveWorkflows(userId);
1393   }
1394 
1395   /**
1396    * Callback for workflow operations that were throwing an exception. This implementation assumes that the operation
1397    * worker has already adjusted the current operation's state appropriately.
1398    *
1399    * @param workflow
1400    *          the workflow instance
1401    * @param currentOperation
1402    *          the current workflow operation
1403    * @return the workflow instance
1404    */
1405   protected WorkflowInstance handleOperationException(
1406       WorkflowInstance workflow,
1407       WorkflowOperationInstance currentOperation) {
1408     int failedAttempt = currentOperation.getFailedAttempts() + 1;
1409     currentOperation.setFailedAttempts(failedAttempt);
1410 
1411     // Operation was aborted by the user, after going into hold state
1412     if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate())
1413             && OperationState.FAILED.equals(currentOperation.getState())) {
1414       int position = workflow.getOperations().indexOf(currentOperation);
1415       // Advance to operation that actually failed
1416       if (workflow.getOperations().size() > position + 1) { // This should always be true...
1417         currentOperation = workflow.getOperations().get(position + 1);
1418         // It's currently in RETRY state, change to FAILED
1419         currentOperation.setState(OperationState.FAILED);
1420       }
1421       handleFailedOperation(workflow, currentOperation);
1422     } else if (currentOperation.getMaxAttempts() != -1 && failedAttempt == currentOperation.getMaxAttempts()) {
1423       handleFailedOperation(workflow, currentOperation);
1424     } else {
1425       switch (currentOperation.getRetryStrategy()) {
1426         case NONE:
1427           handleFailedOperation(workflow, currentOperation);
1428           break;
1429         case RETRY:
1430           currentOperation.setState(OperationState.RETRY);
1431           break;
1432         case HOLD:
1433           currentOperation.setState(OperationState.RETRY);
1434           List<WorkflowOperationInstance> operations = workflow.getOperations();
1435           WorkflowOperationDefinitionImpl errorResolutionDefinition = new WorkflowOperationDefinitionImpl(
1436                   ERROR_RESOLUTION_HANDLER_ID, "Error Resolution Operation", "error", false);
1437           var errorResolutionInstance = new WorkflowOperationInstance(errorResolutionDefinition);
1438           errorResolutionInstance.setExceptionHandlingWorkflow(currentOperation.getExceptionHandlingWorkflow());
1439           var index = workflow.getOperations().indexOf(currentOperation);
1440           operations.add(index, errorResolutionInstance);
1441           workflow.setOperations(operations);
1442           break;
1443         default:
1444           break;
1445       }
1446     }
1447     return workflow;
1448   }
1449 
1450   /**
1451    * Handles the workflow for a failing operation.
1452    *
1453    * @param workflow
1454    *          the workflow
1455    * @param currentOperation
1456    *          the failing workflow operation instance
1457    */
1458   private void handleFailedOperation(WorkflowInstance workflow, WorkflowOperationInstance currentOperation) {
1459     String errorDefId = currentOperation.getExceptionHandlingWorkflow();
1460 
1461     // Adjust the workflow state according to the setting on the operation
1462     if (currentOperation.isFailOnError()) {
1463       if (StringUtils.isBlank(errorDefId)) {
1464         workflow.setState(FAILED);
1465       } else {
1466         workflow.setState(FAILING);
1467 
1468         // Remove the rest of the original workflow
1469         int currentOperationPosition = workflow.getOperations().indexOf(currentOperation);
1470         List<WorkflowOperationInstance> operations = new ArrayList<>(
1471                 workflow.getOperations().subList(0, currentOperationPosition + 1));
1472         workflow.setOperations(operations);
1473 
1474         // Determine the current workflow configuration
1475         Map<String, String> configuration = new HashMap<>();
1476         for (String configKey : workflow.getConfigurationKeys()) {
1477           configuration.put(configKey, workflow.getConfiguration(configKey));
1478         }
1479 
1480         // Append the operations
1481         WorkflowDefinition errorDef = null;
1482         try {
1483           errorDef = getWorkflowDefinitionById(errorDefId);
1484           workflow.extend(errorDef);
1485           workflow.setOperations(updateConfiguration(workflow, configuration).getOperations());
1486         } catch (NotFoundException notFoundException) {
1487           throw new IllegalStateException("Unable to find the error workflow definition '" + errorDefId + "'");
1488         }
1489       }
1490     }
1491 
1492     // Fail the current operation
1493     currentOperation.setState(OperationState.FAILED);
1494   }
1495 
1496   /**
1497    * Callback for workflow operation handlers that executed and finished without exception. This implementation assumes
1498    * that the operation worker has already adjusted the current operation's state appropriately.
1499    *
1500    * @param workflow
1501    *          the workflow instance
1502    * @param result
1503    *          the workflow operation result
1504    * @return the workflow instance
1505    * @throws WorkflowDatabaseException
1506    *           if updating the workflow fails
1507    */
1508   protected WorkflowInstance handleOperationResult(WorkflowInstance workflow, WorkflowOperationResult result)
1509           throws WorkflowDatabaseException {
1510 
1511     // Get the operation and its handler
1512     WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
1513     WorkflowOperationHandler handler = getWorkflowOperationHandler(currentOperation.getTemplate());
1514 
1515     // Create an operation result for the lazy or else update the workflow's media package
1516     if (result == null) {
1517       logger.warn("Handling a null operation result for workflow {} in operation {}", workflow.getId(),
1518               currentOperation.getTemplate());
1519       result = new WorkflowOperationResultImpl(workflow.getMediaPackage(), null, Action.CONTINUE, 0);
1520     } else {
1521       MediaPackage mp = result.getMediaPackage();
1522       if (mp != null) {
1523         workflow.setMediaPackage(mp);
1524       }
1525     }
1526 
1527     // The action to take
1528     Action action = result.getAction();
1529 
1530     // Update the workflow configuration.
1531     workflow = updateConfiguration(workflow, result.getProperties());
1532 
1533     // Adjust workflow statistics
1534     currentOperation.setTimeInQueue(result.getTimeInQueue());
1535 
1536     // Adjust the operation state
1537     switch (action) {
1538       case CONTINUE:
1539         currentOperation.setState(OperationState.SUCCEEDED);
1540         break;
1541       case PAUSE:
1542         if (!(handler instanceof ResumableWorkflowOperationHandler)) {
1543           throw new IllegalStateException("Operation " + currentOperation.getTemplate() + " is not resumable");
1544         }
1545 
1546         // Set abortable and continuable to default values
1547         currentOperation.setContinuable(result.allowsContinue());
1548         currentOperation.setAbortable(result.allowsAbort());
1549 
1550         workflow.setState(PAUSED);
1551         currentOperation.setState(OperationState.PAUSED);
1552         break;
1553       case SKIP:
1554         currentOperation.setState(OperationState.SKIPPED);
1555         break;
1556       default:
1557         throw new IllegalStateException("Unknown action '" + action + "' returned");
1558     }
1559 
1560     if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate()) && result.getAction() == Action.CONTINUE) {
1561 
1562       Map<String, String> resultProperties = result.getProperties();
1563       if (resultProperties == null || StringUtils.isBlank(resultProperties.get(RETRY_STRATEGY))) {
1564         throw new WorkflowDatabaseException("Retry strategy not present in properties!");
1565       }
1566 
1567       RetryStrategy retryStrategy = RetryStrategy.valueOf(resultProperties.get(RETRY_STRATEGY));
1568       switch (retryStrategy) {
1569         case NONE:
1570           handleFailedOperation(workflow, workflow.getCurrentOperation());
1571           break;
1572         case RETRY:
1573           break;
1574         default:
1575           throw new WorkflowDatabaseException("Retry strategy not implemented yet!");
1576       }
1577     }
1578 
1579     return workflow;
1580   }
1581 
1582   /**
1583    * Reads the available metadata from the dublin core catalog (if there is one) and updates the mediapackage.
1584    *
1585    * @param mp
1586    *          the media package
1587    */
1588   protected void populateMediaPackageMetadata(MediaPackage mp) {
1589     if (metadataServices.isEmpty()) {
1590       logger.warn("No metadata services are registered, so no media package metadata can be extracted from catalogs");
1591       return;
1592     }
1593     for (MediaPackageMetadataService metadataService : metadataServices) {
1594       MediaPackageMetadata metadata = metadataService.getMetadata(mp);
1595       MediaPackageMetadataSupport.populateMediaPackageMetadata(mp, metadata);
1596     }
1597   }
1598 
1599   /**
1600    * {@inheritDoc}
1601    *
1602    * @see org.opencastproject.job.api.JobProducer#isReadyToAcceptJobs(String)
1603    */
1604   @Override
1605   public boolean isReadyToAcceptJobs(String operation) {
1606     return true;
1607   }
1608 
1609   /**
1610    * {@inheritDoc}
1611    *
1612    * If we are already running the maximum number of workflows, don't accept another START_WORKFLOW job
1613    *
1614    * @see org.opencastproject.job.api.AbstractJobProducer#isReadyToAccept(org.opencastproject.job.api.Job)
1615    */
1616   @Override
1617   public boolean isReadyToAccept(Job job) throws UndispatchableJobException {
1618     String operation = job.getOperation();
1619 
1620     // Only restrict execution of new jobs
1621     if (!Operation.START_WORKFLOW.toString().equals(operation)) {
1622       return true;
1623     }
1624 
1625     // If the first operation is guaranteed to pause, run the job.
1626     if (job.getArguments().size() > 1 && job.getArguments().get(0) != null) {
1627       try {
1628         WorkflowDefinition workflowDef = XmlWorkflowParser.parseWorkflowDefinition(job.getArguments().get(0));
1629         if (workflowDef.getOperations().size() > 0) {
1630           String firstOperationId = workflowDef.getOperations().get(0).getId();
1631           WorkflowOperationHandler handler = getWorkflowOperationHandler(firstOperationId);
1632           if (handler instanceof ResumableWorkflowOperationHandler) {
1633             if (((ResumableWorkflowOperationHandler) handler).isAlwaysPause()) {
1634               return true;
1635             }
1636           }
1637         }
1638       } catch (WorkflowParsingException e) {
1639         throw new UndispatchableJobException(job + " is not a proper job to start a workflow", e);
1640       }
1641     }
1642 
1643     WorkflowInstance workflow;
1644     Optional<WorkflowInstance> workflowInstance;
1645     String mediaPackageId;
1646 
1647     // Fetch all workflows that are running with the current media package
1648     try {
1649       workflow = getWorkflowById(job.getId());
1650       mediaPackageId = workflow.getMediaPackage().getIdentifier().toString();
1651     } catch (NotFoundException e) {
1652       throw new UndispatchableJobException("Trying to start workflow with job id " + job.getId()
1653           + " but no corresponding instance is available from the workflow service", e);
1654     } catch (UnauthorizedException e) {
1655       throw new UndispatchableJobException(
1656           "Authorization denied while requesting to loading workflow instance. Job: " + job.getId(), e);
1657     }
1658 
1659     try {
1660       workflowInstance = getRunningWorkflowInstanceByMediaPackage(
1661               workflow.getMediaPackage().getIdentifier().toString(), Permissions.Action.READ.toString());
1662     } catch (UnauthorizedException e) {
1663       throw new UndispatchableJobException("Authorization denied while requesting to loading workflow instance "
1664           + workflow.getId(), e);
1665     } catch (WorkflowDatabaseException e) {
1666       throw new UndispatchableJobException("An database error occurred while checking if a workflow is already active "
1667           + "(job: " + job.getId() + ")", e);
1668     } catch (WorkflowException e) {
1669       // Avoid running multiple workflows with same media package id at the same time
1670       delayWorkflow(workflow, mediaPackageId);
1671       return false;
1672     }
1673 
1674     // Make sure we are not excluding ourselves
1675     if (workflowInstance.isPresent() && workflow.getId() != workflowInstance.get().getId()) {
1676       delayWorkflow(workflow, mediaPackageId);
1677       return false;
1678     }
1679 
1680     return true;
1681   }
1682 
1683   private void delayWorkflow(WorkflowInstance workflow, String mediaPackageId) {
1684     if (!delayedWorkflows.contains(workflow.getId())) {
1685       logger.info("Delaying start of workflow {}, another workflow on media package {} is still running",
1686               workflow.getId(), mediaPackageId);
1687       delayedWorkflows.add(workflow.getId());
1688     }
1689   }
1690 
1691   /**
1692    * {@inheritDoc}
1693    *
1694    * @see org.opencastproject.job.api.AbstractJobProducer#acceptJob(org.opencastproject.job.api.Job)
1695    */
1696   @Override
1697   public synchronized void acceptJob(Job job) throws ServiceRegistryException {
1698     User originalUser = securityService.getUser();
1699     Organization originalOrg = securityService.getOrganization();
1700     try {
1701       Organization organization = organizationDirectoryService.getOrganization(job.getOrganization());
1702       securityService.setOrganization(organization);
1703       User user = userDirectoryService.loadUser(job.getCreator());
1704       securityService.setUser(user);
1705       job.setStatus(Job.Status.RUNNING);
1706       job = serviceRegistry.updateJob(job);
1707 
1708       // Check if this workflow was initially delayed
1709       if (delayedWorkflows.contains(job.getId())) {
1710         delayedWorkflows.remove(job.getId());
1711         logger.info("Starting initially delayed workflow {}, {} more waiting", job.getId(), delayedWorkflows.size());
1712       }
1713 
1714       executorService.submit(new JobRunner(job, serviceRegistry.getCurrentJob()));
1715     } catch (Exception e) {
1716       if (e instanceof ServiceRegistryException) {
1717         throw (ServiceRegistryException) e;
1718       }
1719       throw new ServiceRegistryException(e);
1720     } finally {
1721       securityService.setUser(originalUser);
1722       securityService.setOrganization(originalOrg);
1723     }
1724   }
1725 
1726   /**
1727    * Processes the workflow job.
1728    *
1729    * @param job
1730    *          the job
1731    * @return the job payload
1732    * @throws Exception
1733    *           if job processing fails
1734    */
1735   protected String process(Job job) throws Exception {
1736     List<String> arguments = job.getArguments();
1737     Operation op = null;
1738     WorkflowInstance workflowInstance = null;
1739     WorkflowOperationInstance wfo;
1740     String operation = job.getOperation();
1741     try {
1742       try {
1743         op = Operation.valueOf(operation);
1744         switch (op) {
1745           case START_WORKFLOW:
1746             workflowInstance = persistence.getWorkflow(Long.parseLong(job.getPayload()));
1747             logger.debug("Starting new workflow {}", workflowInstance);
1748             runWorkflow(workflowInstance);
1749             break;
1750           case RESUME:
1751             workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1752             Map<String, String> properties = null;
1753             if (arguments.size() > 1) {
1754               Properties props = new Properties();
1755               props.load(IOUtils.toInputStream(arguments.get(arguments.size() - 1), StandardCharsets.UTF_8));
1756               properties = new HashMap<>();
1757               for (Entry<Object, Object> entry : props.entrySet()) {
1758                 properties.put(entry.getKey().toString(), entry.getValue().toString());
1759               }
1760             }
1761             logger.debug("Resuming {} at {}", workflowInstance, workflowInstance.getCurrentOperation());
1762             workflowInstance.setState(RUNNING);
1763             update(workflowInstance);
1764             runWorkflowOperation(workflowInstance, properties);
1765             break;
1766           case START_OPERATION:
1767             workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1768             wfo = workflowInstance.getCurrentOperation();
1769 
1770             if (OperationState.RUNNING.equals(wfo.getState()) || OperationState.PAUSED.equals(wfo.getState())) {
1771               logger.info("Reset operation state {} {} to INSTANTIATED due to job restart", workflowInstance, wfo);
1772               wfo.setState(OperationState.INSTANTIATED);
1773             }
1774 
1775             wfo.setExecutionHost(job.getProcessingHost());
1776             logger.debug("Running {} {}", workflowInstance, wfo);
1777             wfo = runWorkflowOperation(workflowInstance, null);
1778             updateOperationJob(job.getId(), wfo.getState());
1779             break;
1780           default:
1781             throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
1782         }
1783       } catch (IllegalArgumentException e) {
1784         throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
1785       } catch (IndexOutOfBoundsException e) {
1786         throw new ServiceRegistryException(
1787             "The argument list for operation '" + op + "' (job: " + job.getId() + ") does not meet expectations", e);
1788       } catch (NotFoundException e) {
1789         logger.warn("Not found processing job {}", job, e);
1790         updateOperationJob(job.getId(), OperationState.FAILED);
1791       }
1792       return null;
1793     } catch (Exception e) {
1794       logger.warn("Exception while accepting job {}", job, e);
1795       try {
1796         if (workflowInstance != null) {
1797           logger.warn("Marking job {} and workflow instance {} as failed", job, workflowInstance);
1798           updateOperationJob(job.getId(), OperationState.FAILED);
1799           workflowInstance.setState(FAILED);
1800           update(workflowInstance);
1801         } else {
1802           logger.warn("Unable to parse workflow instance", e);
1803         }
1804       } catch (WorkflowDatabaseException e1) {
1805         throw new ServiceRegistryException(e1);
1806       }
1807       if (e instanceof ServiceRegistryException) {
1808         throw e;
1809       }
1810       throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
1811     }
1812   }
1813 
1814   /**
1815    * Synchronizes the workflow operation's job with the operation status if the operation has a job associated with it,
1816    * which is determined by looking at the operation's job id.
1817    *
1818    * @param state
1819    *          the operation state
1820    * @param jobId
1821    *          the associated job
1822    * @return the updated job or <code>null</code> if there is no job for this operation
1823    * @throws ServiceRegistryException
1824    *           if the job can't be updated in the service registry
1825    * @throws NotFoundException
1826    *           if the job can't be found
1827    */
1828   private Job updateOperationJob(Long jobId, OperationState state) throws NotFoundException, ServiceRegistryException {
1829     if (jobId == null) {
1830       return null;
1831     }
1832     Job job = serviceRegistry.getJob(jobId);
1833     switch (state) {
1834       case FAILED:
1835       case RETRY:
1836         job.setStatus(Status.FAILED);
1837         break;
1838       case PAUSED:
1839         job.setStatus(Status.PAUSED);
1840         job.setOperation(Operation.RESUME.toString());
1841         break;
1842       case SKIPPED:
1843       case SUCCEEDED:
1844         job.setStatus(Status.FINISHED);
1845         break;
1846       default:
1847         throw new IllegalStateException("Unexpected state '" + state + "' found");
1848     }
1849     return serviceRegistry.updateJob(job);
1850   }
1851 
1852   /**
1853    * {@inheritDoc}
1854    *
1855    * @see org.opencastproject.job.api.JobProducer#countJobs(org.opencastproject.job.api.Job.Status)
1856    */
1857   @Override
1858   public long countJobs(Status status) throws ServiceRegistryException {
1859     return serviceRegistry.count(JOB_TYPE, status);
1860   }
1861 
1862   /**
1863    * Converts a Map<String, String> to s key=value\n string, suitable for the properties form parameter expected by the
1864    * workflow rest endpoint.
1865    *
1866    * @param props
1867    *          The map of strings
1868    * @return the string representation
1869    */
1870   private String mapToString(Map<String, String> props) {
1871     if (props == null) {
1872       return null;
1873     }
1874     StringBuilder sb = new StringBuilder();
1875     for (Entry<String, String> entry : props.entrySet()) {
1876       sb.append(entry.getKey());
1877       sb.append("=");
1878       sb.append(entry.getValue());
1879       sb.append("\n");
1880     }
1881     return sb.toString();
1882   }
1883 
1884   /**
1885    * Dummy callback for osgi
1886    *
1887    * @param unused
1888    *          the unused ReadinessIndicator
1889    */
1890   @Reference(target = "(artifact=workflowdefinition)")
1891   protected void setProfilesReadyIndicator(ReadinessIndicator unused) { }
1892 
1893   /**
1894    * Callback for the OSGi environment to register with the <code>Workspace</code>.
1895    *
1896    * @param workspace
1897    *          the workspace
1898    */
1899   @Reference
1900   protected void setWorkspace(Workspace workspace) {
1901     this.workspace = workspace;
1902   }
1903 
1904   /**
1905    * Callback for the OSGi environment to register with the <code>ServiceRegistry</code>.
1906    *
1907    * @param registry
1908    *          the service registry
1909    */
1910   @Reference
1911   protected void setServiceRegistry(ServiceRegistry registry) {
1912     this.serviceRegistry = registry;
1913   }
1914 
1915   public ServiceRegistry getServiceRegistry() {
1916     return serviceRegistry;
1917   }
1918 
1919   /**
1920    * Callback for setting the security service.
1921    *
1922    * @param securityService
1923    *          the securityService to set
1924    */
1925   @Reference
1926   public void setSecurityService(SecurityService securityService) {
1927     this.securityService = securityService;
1928   }
1929 
1930   /**
1931    * Callback for setting the authorization service.
1932    *
1933    * @param authorizationService
1934    *          the authorizationService to set
1935    */
1936   @Reference
1937   public void setAuthorizationService(AuthorizationService authorizationService) {
1938     this.authorizationService = authorizationService;
1939   }
1940 
1941   /**
1942    * Callback for setting the user directory service
1943    *
1944    * @param userDirectoryService
1945    *          the userDirectoryService to set
1946    */
1947   @Reference
1948   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1949     this.userDirectoryService = userDirectoryService;
1950   }
1951 
1952   /**
1953    * Sets a reference to the organization directory service.
1954    *
1955    * @param organizationDirectory
1956    *          the organization directory
1957    */
1958   @Reference
1959   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
1960     this.organizationDirectoryService = organizationDirectory;
1961   }
1962 
1963   /**
1964    * Sets the series service
1965    *
1966    * @param seriesService
1967    *          the seriesService to set
1968    */
1969   @Reference
1970   public void setSeriesService(SeriesService seriesService) {
1971     this.seriesService = seriesService;
1972   }
1973 
1974   /**
1975    * Sets the asset manager
1976    *
1977    * @param assetManager
1978    *          the assetManager to set
1979    */
1980   @Reference
1981   public void setAssetManager(AssetManager assetManager) {
1982     this.assetManager = assetManager;
1983   }
1984 
1985   /**
1986    * Callback to set the metadata service
1987    *
1988    * @param service
1989    *          the metadata service
1990    */
1991   @Reference(
1992       cardinality = ReferenceCardinality.AT_LEAST_ONE,
1993       policy = ReferencePolicy.DYNAMIC,
1994       unbind = "removeMetadataService"
1995   )
1996   protected void addMetadataService(MediaPackageMetadataService service) {
1997     metadataServices.add(service);
1998   }
1999 
2000   /**
2001    * Callback to remove a mediapackage metadata service.
2002    *
2003    * @param service
2004    *          the mediapackage metadata service to remove
2005    */
2006   protected void removeMetadataService(MediaPackageMetadataService service) {
2007     metadataServices.remove(service);
2008   }
2009 
2010   /**
2011    * Callback to set the workflow definition scanner
2012    *
2013    * @param scanner
2014    *          the workflow definition scanner
2015    */
2016   @Reference
2017   protected void addWorkflowDefinitionScanner(WorkflowDefinitionScanner scanner) {
2018     workflowDefinitionScanner = scanner;
2019   }
2020 
2021   /**
2022    * Callback to set the Admin UI index.
2023    *
2024    * @param index
2025    *          the admin UI index.
2026    */
2027   @Reference
2028   public void setIndex(ElasticsearchIndex index) {
2029     this.index = index;
2030   }
2031 
2032   /**
2033    * Callback to set the workflow database
2034    *
2035    * @param persistence
2036    *          the workflow database
2037    */
2038   @Reference(name = "workflow-persistence")
2039   public void setPersistence(WorkflowServiceDatabase persistence) {
2040     this.persistence = persistence;
2041   }
2042 
2043   /**
2044    * {@inheritDoc}
2045    *
2046    * @see org.opencastproject.job.api.JobProducer#getJobType()
2047    */
2048   @Override
2049   public String getJobType() {
2050     return JOB_TYPE;
2051   }
2052 
2053   /**
2054    * A tuple of a workflow operation handler and the name of the operation it handles
2055    */
2056   public static class HandlerRegistration {
2057 
2058     protected WorkflowOperationHandler handler;
2059     protected String operationName;
2060 
2061     public HandlerRegistration(String operationName, WorkflowOperationHandler handler) {
2062       if (operationName == null) {
2063         throw new IllegalArgumentException("Operation name cannot be null");
2064       }
2065       if (handler == null) {
2066         throw new IllegalArgumentException("Handler cannot be null");
2067       }
2068       this.operationName = operationName;
2069       this.handler = handler;
2070     }
2071 
2072     public WorkflowOperationHandler getHandler() {
2073       return handler;
2074     }
2075 
2076     /**
2077      * {@inheritDoc}
2078      *
2079      * @see java.lang.Object#hashCode()
2080      */
2081     @Override
2082     public int hashCode() {
2083       final int prime = 31;
2084       int result = 1;
2085       result = prime * result + handler.hashCode();
2086       result = prime * result + operationName.hashCode();
2087       return result;
2088     }
2089 
2090     /**
2091      * {@inheritDoc}
2092      *
2093      * @see java.lang.Object#equals(java.lang.Object)
2094      */
2095     @Override
2096     public boolean equals(Object obj) {
2097       if (this == obj) {
2098         return true;
2099       }
2100       if (obj == null) {
2101         return false;
2102       }
2103       if (getClass() != obj.getClass()) {
2104         return false;
2105       }
2106       HandlerRegistration other = (HandlerRegistration) obj;
2107       if (!handler.equals(other.handler)) {
2108         return false;
2109       }
2110       return operationName.equals(other.operationName);
2111     }
2112   }
2113 
2114   /**
2115    * A utility class to run jobs
2116    */
2117   class JobRunner implements Callable<Void> {
2118 
2119     /** The job */
2120     private Job job = null;
2121 
2122     /** The current job */
2123     private final Job currentJob;
2124 
2125     /**
2126      * Constructs a new job runner
2127      *
2128      * @param job
2129      *          the job to run
2130      * @param currentJob
2131      *          the current running job
2132      */
2133     JobRunner(Job job, Job currentJob) {
2134       this.job = job;
2135       this.currentJob = currentJob;
2136     }
2137 
2138     /**
2139      * {@inheritDoc}
2140      *
2141      * @see java.util.concurrent.Callable#call()
2142      */
2143     @Override
2144     public Void call() throws Exception {
2145       Organization jobOrganization = organizationDirectoryService.getOrganization(job.getOrganization());
2146       try {
2147         serviceRegistry.setCurrentJob(currentJob);
2148         securityService.setOrganization(jobOrganization);
2149         User jobUser = userDirectoryService.loadUser(job.getCreator());
2150         securityService.setUser(jobUser);
2151         process(job);
2152       } finally {
2153         serviceRegistry.setCurrentJob(null);
2154         securityService.setUser(null);
2155         securityService.setOrganization(null);
2156       }
2157       return null;
2158     }
2159   }
2160 
2161   @Override
2162   public synchronized void cleanupWorkflowInstances(int buffer, WorkflowState state) throws UnauthorizedException,
2163           WorkflowDatabaseException {
2164     logger.info("Start cleaning up workflow instances older than {} days with status '{}'", buffer, state);
2165 
2166     int instancesCleaned = 0;
2167     int cleaningFailed = 0;
2168 
2169     Date priorTo = DateUtils.addDays(new Date(), -buffer);
2170 
2171     try {
2172       for (WorkflowInstance workflowInstance : persistence.getWorkflowInstancesForCleanup(state, priorTo)) {
2173         try {
2174           logger.debug("Deleting workflow instance {}", workflowInstance.getId());
2175           remove(workflowInstance.getId());
2176           instancesCleaned++;
2177         } catch (WorkflowDatabaseException | UnauthorizedException e) {
2178           throw e;
2179         } catch (NotFoundException e) {
2180           // Since we are in a cleanup operation, we don't have to care about NotFoundExceptions
2181           logger.debug("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2182         } catch (WorkflowParsingException | WorkflowStateException e) {
2183           logger.warn("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2184           cleaningFailed++;
2185         }
2186       }
2187     } catch (WorkflowDatabaseException e) {
2188       throw new WorkflowDatabaseException(e);
2189     }
2190 
2191     if (instancesCleaned == 0 && cleaningFailed == 0) {
2192       logger.info("No workflow instances found to clean up");
2193       return;
2194     }
2195 
2196     if (instancesCleaned > 0) {
2197       logger.info("Cleaned up '{}' workflow instances", instancesCleaned);
2198     }
2199     if (cleaningFailed > 0) {
2200       logger.warn("Cleaning failed for '{}' workflow instances", cleaningFailed);
2201       throw new WorkflowDatabaseException("Unable to clean all workflow instances, see logs!");
2202     }
2203   }
2204 
2205   @Override
2206   public Map<String, Map<String, String>> getWorkflowStateMappings() {
2207     return workflowDefinitionScanner.workflowStateMappings.entrySet().stream().collect(Collectors.toMap(
2208             Entry::getKey, e -> e.getValue().stream()
2209                     .collect(Collectors.toMap(m -> m.getState().name(), WorkflowStateMapping::getValue))
2210     ));
2211   }
2212 
2213   @Override
2214   public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
2215     try {
2216       final int total;
2217       try {
2218         total = persistence.countMediaPackages();
2219       } catch (WorkflowDatabaseException e) {
2220         logIndexRebuildError(logger, e);
2221         throw new IndexRebuildException(getService(), e);
2222       }
2223 
2224       if (total > 0) {
2225         logIndexRebuildBegin(logger, total, "workflows");
2226         int current = 0;
2227         int n = 20;
2228         List<WorkflowIndexData> workflowIndexData;
2229 
2230         int limit = 1000;
2231         int offset = 0;
2232         String currentMediapackageId;
2233         String lastMediapackageId = "";
2234         var updatedWorkflowRange = new ArrayList<Event>();
2235         do {
2236           try {
2237             workflowIndexData = persistence.getWorkflowIndexData(limit, offset);
2238           } catch (WorkflowDatabaseException e) {
2239             logIndexRebuildError(logger, e);
2240             throw new IndexRebuildException(getService(), e);
2241           }
2242           if (!workflowIndexData.isEmpty()) {
2243             offset += limit;
2244             logger.debug("Got {} workflows for re-indexing", workflowIndexData.size());
2245 
2246             for (WorkflowIndexData indexData : workflowIndexData) {
2247               currentMediapackageId = indexData.getMediaPackageId();
2248               if (currentMediapackageId.equals(lastMediapackageId)) {
2249                 continue;
2250               }
2251               current++;
2252 
2253               // Include PAUSED; otherwise, paused workflows will show up as "Finished"
2254               if (!WorkflowUtil.isActive(WorkflowInstance.WorkflowState.values()[indexData.getState()].toString())
2255                       || WorkflowState.PAUSED == WorkflowInstance.WorkflowState.values()[indexData.getState()]) {
2256                 String orgid = indexData.getOrganizationId();
2257                 if (null == orgid) {
2258                   String mpId = indexData.getMediaPackageId();
2259                   //We're assuming here that mediapackages don't change orgs
2260                   List<Snapshot> snapshots = assetManager.getSnapshotsById(mpId);
2261                   if (snapshots.size() == 0) {
2262                     logger.debug("Dropping {} from the index since it is missing from the database", mpId);
2263                     continue;
2264                   }
2265                   orgid = snapshots.stream().findFirst().get().getOrganizationId();
2266                   //We try-catch here since it's possible for the WF to exist in the *index* but not in the *DB*
2267                   // It probably shouldn't be, but that won't keep it from happening anyway.
2268                   try {
2269                     //NB: This version of getWorkflow takes the org id, which in this case is null
2270                     // Using the normal version filters by org, and since this workflow has a NULL org it can't be found
2271                     WorkflowInstance instance = persistence.getWorkflow(indexData.getId(), null);
2272                     instance.setOrganizationId(orgid);
2273                     persistence.updateInDatabase(instance);
2274                   } catch (NotFoundException e) {
2275                     //Technically this should never happen, but getWorkflow throws it.
2276                   }
2277                 }
2278                 var updatedWorkflowData = index.getEvent(indexData.getMediaPackageId(), orgid,
2279                     securityService.getUser());
2280                 updatedWorkflowData = getStateUpdateFunction(indexData.getId(),indexData.getState(),
2281                         indexData.getMediaPackageId(), indexData.getTemplate(), indexData.getOrganizationId())
2282                         .apply(updatedWorkflowData);
2283                 updatedWorkflowRange.add(updatedWorkflowData.get());
2284 
2285                 if (updatedWorkflowRange.size() >= n || current >= total) {
2286                   index.bulkEventUpdate(updatedWorkflowRange);
2287                   logIndexRebuildProgress(logger, total, current, n);
2288                   updatedWorkflowRange.clear();
2289                 }
2290               }
2291               else {
2292                 logger.info("Skipping. Workflow {} is currently active.", indexData.getId());
2293               }
2294 
2295               lastMediapackageId = currentMediapackageId;
2296             }
2297           }
2298         } while (!workflowIndexData.isEmpty());
2299       }
2300     } catch (Exception e) {
2301       logIndexRebuildError(logger, e);
2302       throw new IndexRebuildException(getService(), e);
2303     }
2304   }
2305 
2306   @Override
2307   public IndexRebuildService.Service getService() {
2308     return IndexRebuildService.Service.Workflow;
2309   }
2310 
2311   /**
2312    * Remove a workflow instance from the Elasticsearch index.
2313    *
2314    * @param workflowInstanceId
2315    *         the identifier of the workflow instance to remove
2316    */
2317   private void removeWorkflowInstanceFromIndex(long workflowInstanceId) {
2318     final String orgId = securityService.getOrganization().getId();
2319     final User user = securityService.getUser();
2320 
2321     // find events
2322     SearchResult<Event> results;
2323     try {
2324       results = index.getByQuery(new EventSearchQuery(orgId, user).withWorkflowId(workflowInstanceId));
2325     } catch (SearchIndexException e) {
2326       logger.error("Error retrieving the events for workflow instance {} from the {} index.", workflowInstanceId,
2327               index.getIndexName(), e);
2328       return;
2329     }
2330 
2331     if (results.getItems().length == 0) {
2332       logger.warn("No events for workflow instance {} found in the {} index.", workflowInstanceId,
2333               index.getIndexName());
2334       return;
2335     }
2336 
2337     // should be only one event, but better safe than sorry
2338     for (SearchResultItem<Event> item: results.getItems()) {
2339       String eventId = item.getSource().getIdentifier();
2340       logger.debug("Removing workflow instance {} of event {} from the {} index.", workflowInstanceId, eventId,
2341               index.getIndexName());
2342 
2343       Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
2344         if (!eventOpt.isPresent()) {
2345           logger.warn("Event {} of workflow instance {} not found in the {} index.", workflowInstanceId,
2346                   eventId, index.getIndexName());
2347           return Optional.empty();
2348         }
2349         Event event = eventOpt.get();
2350         if (event.getWorkflowId() != null && event.getWorkflowId().equals(workflowInstanceId)) {
2351           logger.debug("Workflow {} is the current workflow of event {}. Removing it from event.", eventId,
2352                   workflowInstanceId);
2353           event.setWorkflowId(null);
2354           event.setWorkflowDefinitionId(null);
2355           event.setWorkflowState(null);
2356           return Optional.of(event);
2357         }
2358         return Optional.empty();
2359       };
2360 
2361       try {
2362         index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
2363         logger.debug("Workflow instance {} of event {} removed from the {} index.", workflowInstanceId, eventId,
2364                 index.getIndexName());
2365       } catch (SearchIndexException e) {
2366         logger.error("Error removing the workflow instance {} of event {} from the {} index.", workflowInstanceId,
2367                 eventId, index.getIndexName(), e);
2368       }
2369     }
2370   }
2371 
2372   /**
2373    * Update a workflow instance in the Elasticsearch index.
2374    *
2375    * @param id
2376    *         workflow id
2377    * @param state
2378    *         workflow state as int
2379    * @param mpId
2380    *         corresponding mediapackage id
2381    * @param orgId
2382    *         workflow organization id
2383    */
2384   private void updateWorkflowInstanceInIndex(long id, int state, String wfDefId, String mpId, String orgId) {
2385     final User user = securityService.getUser();
2386 
2387     logger.debug("Updating workflow instance {} of event {} in the {} index.", id, mpId,
2388             index.getIndexName());
2389     Function<Optional<Event>, Optional<Event>> updateFunction = getStateUpdateFunction(id, state, wfDefId, mpId, orgId);
2390 
2391     try {
2392       index.addOrUpdateEvent(mpId, updateFunction, orgId, user);
2393       logger.debug("Workflow instance {} of event {} updated in the {} index.", id, mpId,
2394               index.getIndexName());
2395     } catch (SearchIndexException e) {
2396       logger.error("Error updating the workflow instance {} of event {} in the {} index.", id, mpId,
2397               index.getIndexName(), e);
2398     }
2399   }
2400 
2401   /**
2402    * Get the function to update the workflow state for an event in the Elasticsearch index.
2403    *
2404    * @return the function to do the update
2405    */
2406   private Function<Optional<Event>, Optional<Event>> getStateUpdateFunction(long workflowId,
2407           int workflowState, String workflowDefinitionId, String mediaPackageId,
2408           String orgId) {
2409     return (Optional<Event> eventOpt) -> {
2410       Event event = eventOpt.orElse(new Event(mediaPackageId, orgId));
2411       event.setWorkflowId(workflowId);
2412       event.setWorkflowState(WorkflowState.values()[workflowState]);
2413       event.setWorkflowDefinitionId(workflowDefinitionId);
2414       return Optional.of(event);
2415     };
2416   }
2417 }