View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.impl;
23  
24  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
25  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILED;
26  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILING;
27  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.INSTANTIATED;
28  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.PAUSED;
29  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.RUNNING;
30  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.STOPPED;
31  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.SUCCEEDED;
32  
33  import org.opencastproject.assetmanager.api.AssetManager;
34  import org.opencastproject.assetmanager.api.query.RichAResult;
35  import org.opencastproject.assetmanager.util.WorkflowPropertiesUtil;
36  import org.opencastproject.elasticsearch.api.SearchIndexException;
37  import org.opencastproject.elasticsearch.api.SearchResult;
38  import org.opencastproject.elasticsearch.api.SearchResultItem;
39  import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
40  import org.opencastproject.elasticsearch.index.objects.event.Event;
41  import org.opencastproject.elasticsearch.index.objects.event.EventSearchQuery;
42  import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
43  import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
44  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
45  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
46  import org.opencastproject.job.api.Job;
47  import org.opencastproject.job.api.Job.Status;
48  import org.opencastproject.job.api.JobProducer;
49  import org.opencastproject.mediapackage.MediaPackage;
50  import org.opencastproject.mediapackage.MediaPackageElement;
51  import org.opencastproject.mediapackage.MediaPackageParser;
52  import org.opencastproject.mediapackage.MediaPackageSupport;
53  import org.opencastproject.mediapackage.Publication;
54  import org.opencastproject.metadata.api.MediaPackageMetadata;
55  import org.opencastproject.metadata.api.MediaPackageMetadataService;
56  import org.opencastproject.metadata.api.MetadataService;
57  import org.opencastproject.metadata.api.util.MediaPackageMetadataSupport;
58  import org.opencastproject.security.api.AccessControlList;
59  import org.opencastproject.security.api.AccessControlUtil;
60  import org.opencastproject.security.api.AclScope;
61  import org.opencastproject.security.api.AuthorizationService;
62  import org.opencastproject.security.api.Organization;
63  import org.opencastproject.security.api.OrganizationDirectoryService;
64  import org.opencastproject.security.api.Permissions;
65  import org.opencastproject.security.api.SecurityService;
66  import org.opencastproject.security.api.UnauthorizedException;
67  import org.opencastproject.security.api.User;
68  import org.opencastproject.security.api.UserDirectoryService;
69  import org.opencastproject.series.api.SeriesException;
70  import org.opencastproject.series.api.SeriesService;
71  import org.opencastproject.serviceregistry.api.ServiceRegistry;
72  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
73  import org.opencastproject.serviceregistry.api.UndispatchableJobException;
74  import org.opencastproject.util.NotFoundException;
75  import org.opencastproject.util.ReadinessIndicator;
76  import org.opencastproject.util.data.Tuple;
77  import org.opencastproject.workflow.api.ResumableWorkflowOperationHandler;
78  import org.opencastproject.workflow.api.RetryStrategy;
79  import org.opencastproject.workflow.api.WorkflowDatabaseException;
80  import org.opencastproject.workflow.api.WorkflowDefinition;
81  import org.opencastproject.workflow.api.WorkflowException;
82  import org.opencastproject.workflow.api.WorkflowIdentifier;
83  import org.opencastproject.workflow.api.WorkflowIndexData;
84  import org.opencastproject.workflow.api.WorkflowInstance;
85  import org.opencastproject.workflow.api.WorkflowInstance.WorkflowState;
86  import org.opencastproject.workflow.api.WorkflowListener;
87  import org.opencastproject.workflow.api.WorkflowOperationDefinition;
88  import org.opencastproject.workflow.api.WorkflowOperationDefinitionImpl;
89  import org.opencastproject.workflow.api.WorkflowOperationHandler;
90  import org.opencastproject.workflow.api.WorkflowOperationInstance;
91  import org.opencastproject.workflow.api.WorkflowOperationInstance.OperationState;
92  import org.opencastproject.workflow.api.WorkflowOperationResult;
93  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
94  import org.opencastproject.workflow.api.WorkflowOperationResultImpl;
95  import org.opencastproject.workflow.api.WorkflowParsingException;
96  import org.opencastproject.workflow.api.WorkflowService;
97  import org.opencastproject.workflow.api.WorkflowServiceDatabase;
98  import org.opencastproject.workflow.api.WorkflowStateException;
99  import org.opencastproject.workflow.api.WorkflowStateMapping;
100 import org.opencastproject.workflow.api.WorkflowUtil;
101 import org.opencastproject.workflow.api.XmlWorkflowParser;
102 import org.opencastproject.workspace.api.Workspace;
103 
104 import com.google.common.util.concurrent.Striped;
105 
106 import org.apache.commons.io.IOUtils;
107 import org.apache.commons.lang3.StringUtils;
108 import org.apache.commons.lang3.time.DateUtils;
109 import org.osgi.framework.InvalidSyntaxException;
110 import org.osgi.framework.ServiceReference;
111 import org.osgi.service.component.ComponentContext;
112 import org.osgi.service.component.annotations.Activate;
113 import org.osgi.service.component.annotations.Component;
114 import org.osgi.service.component.annotations.Reference;
115 import org.osgi.service.component.annotations.ReferenceCardinality;
116 import org.osgi.service.component.annotations.ReferencePolicy;
117 import org.slf4j.Logger;
118 import org.slf4j.LoggerFactory;
119 
120 import java.io.ByteArrayOutputStream;
121 import java.io.IOException;
122 import java.nio.charset.StandardCharsets;
123 import java.util.ArrayList;
124 import java.util.Collections;
125 import java.util.Comparator;
126 import java.util.Date;
127 import java.util.HashMap;
128 import java.util.HashSet;
129 import java.util.List;
130 import java.util.Map;
131 import java.util.Map.Entry;
132 import java.util.Optional;
133 import java.util.Properties;
134 import java.util.Set;
135 import java.util.SortedSet;
136 import java.util.TreeSet;
137 import java.util.concurrent.Callable;
138 import java.util.concurrent.CopyOnWriteArrayList;
139 import java.util.concurrent.Executors;
140 import java.util.concurrent.ThreadPoolExecutor;
141 import java.util.concurrent.locks.Lock;
142 import java.util.function.Function;
143 import java.util.stream.Collectors;
144 
145 /**
146  * Implements WorkflowService with in-memory data structures to hold WorkflowOperations and WorkflowInstances.
147  * WorkflowOperationHandlers are looked up in the OSGi service registry based on the "workflow.operation" property. If
148  * the WorkflowOperationHandler's "workflow.operation" service registration property matches
149  * WorkflowOperation.getName(), then the factory returns a WorkflowOperationRunner to handle that operation. This allows
150  * for custom runners to be added or modified without affecting the workflow service itself.
151  */
152 @Component(
153   property = {
154     "service.description=Workflow Service",
155     "service.pid=org.opencastproject.workflow.impl.WorkflowServiceImpl"
156   },
157   immediate = true,
158   service = { WorkflowService.class, WorkflowServiceImpl.class, IndexProducer.class }
159 )
160 public class WorkflowServiceImpl extends AbstractIndexProducer implements WorkflowService, JobProducer {
161 
162   /** Retry strategy property name */
163   private static final String RETRY_STRATEGY = "retryStrategy";
164 
165   /** Logging facility */
166   private static final Logger logger = LoggerFactory.getLogger(WorkflowServiceImpl.class);
167 
168   /** List of available operations on jobs */
169   enum Operation {
170     START_WORKFLOW, RESUME, START_OPERATION
171   }
172 
173   /** Constant value indicating a <code>null</code> parent id */
174   private static final String NULL_PARENT_ID = "-";
175 
176   /** The load imposed on the system by a workflow job.
177    *  We are keeping this hardcoded because otherwise bad things will likely happen,
178    *  like an inability to process a workflow past a certain point in high-load conditions
179    */
180   private static final float WORKFLOW_JOB_LOAD = 0.0f;
181 
182   /** Error resolution handler id constant */
183   public static final String ERROR_RESOLUTION_HANDLER_ID = "error-resolution";
184 
185   /** Remove references to the component context once felix scr 1.2 becomes available */
186   protected ComponentContext componentContext = null;
187 
188   /** The metadata services */
189   private SortedSet<MediaPackageMetadataService> metadataServices;
190 
191   /** Persistent storage */
192   protected WorkflowServiceDatabase persistence;
193 
194   /** The list of workflow listeners */
195   private final List<WorkflowListener> listeners = new CopyOnWriteArrayList<WorkflowListener>();
196 
197   /** The thread pool to use for firing listeners and handling dispatched jobs */
198   protected ThreadPoolExecutor executorService;
199 
200   /** The workspace */
201   protected Workspace workspace = null;
202 
203   /** The service registry */
204   protected ServiceRegistry serviceRegistry = null;
205 
206   /** The security service */
207   protected SecurityService securityService = null;
208 
209   /** The authorization service */
210   protected AuthorizationService authorizationService = null;
211 
212   /** The user directory service */
213   protected UserDirectoryService userDirectoryService = null;
214 
215   /** The organization directory service */
216   protected OrganizationDirectoryService organizationDirectoryService = null;
217 
218   /** The series service */
219   protected SeriesService seriesService;
220 
221   /** The asset manager */
222   protected AssetManager assetManager = null;
223 
224   /** The workflow definition scanner */
225   private WorkflowDefinitionScanner workflowDefinitionScanner;
226 
227   /** List of initially delayed workflows */
228   private final List<Long> delayedWorkflows = new ArrayList<Long>();
229 
230   /** Striped locks for synchronization */
231   private final Striped<Lock> lock = Striped.lazyWeakLock(1024);
232   private final Striped<Lock> updateLock = Striped.lazyWeakLock(1024);
233   private final Striped<Lock> mediaPackageLocks = Striped.lazyWeakLock(1024);
234 
235   /** The Elasticsearch indices */
236   private ElasticsearchIndex index;
237 
238   /**
239    * Constructs a new workflow service impl, with a priority-sorted map of metadata services
240    */
241   public WorkflowServiceImpl() {
242     metadataServices = new TreeSet<>(Comparator.comparingInt(MetadataService::getPriority));
243   }
244 
245   /**
246    * Activate this service implementation via the OSGI service component runtime.
247    *
248    * @param componentContext
249    *          the component context
250    */
251   @Activate
252   public void activate(ComponentContext componentContext) {
253     this.componentContext = componentContext;
254     executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();
255     logger.info("Activate Workflow service");
256   }
257 
258   /**
259    * {@inheritDoc}
260    *
261    * @see org.opencastproject.workflow.api.WorkflowService#addWorkflowListener(org.opencastproject.workflow.api.WorkflowListener)
262    */
263   @Override
264   public void addWorkflowListener(WorkflowListener listener) {
265     listeners.add(listener);
266   }
267 
268   /**
269    * {@inheritDoc}
270    *
271    * @see org.opencastproject.workflow.api.WorkflowService#removeWorkflowListener(org.opencastproject.workflow.api.WorkflowListener)
272    */
273   @Override
274   public void removeWorkflowListener(WorkflowListener listener) {
275     listeners.remove(listener);
276   }
277 
278   /**
279    * Fires the workflow listeners on workflow updates.
280    */
281   protected void fireListeners(final WorkflowInstance oldWorkflowInstance, final WorkflowInstance newWorkflowInstance) {
282     final User currentUser = securityService.getUser();
283     final Organization currentOrganization = securityService.getOrganization();
284     for (final WorkflowListener listener : listeners) {
285       if (oldWorkflowInstance == null || !oldWorkflowInstance.getState().equals(newWorkflowInstance.getState())) {
286         Runnable runnable = () -> {
287           try {
288             securityService.setUser(currentUser);
289             securityService.setOrganization(currentOrganization);
290             listener.stateChanged(newWorkflowInstance);
291           } finally {
292             securityService.setUser(null);
293             securityService.setOrganization(null);
294           }
295         };
296         executorService.execute(runnable);
297       } else {
298         logger.debug("Not notifying {} because the workflow state has not changed", listener);
299       }
300 
301       if (newWorkflowInstance.getCurrentOperation() != null) {
302         if (oldWorkflowInstance == null || oldWorkflowInstance.getCurrentOperation() == null
303                 || !oldWorkflowInstance.getCurrentOperation().equals(newWorkflowInstance.getCurrentOperation())) {
304           Runnable runnable = () -> {
305             try {
306               securityService.setUser(currentUser);
307               securityService.setOrganization(currentOrganization);
308               listener.operationChanged(newWorkflowInstance);
309             } finally {
310               securityService.setUser(null);
311               securityService.setOrganization(null);
312             }
313           };
314           executorService.execute(runnable);
315         }
316       } else {
317         logger.debug("Not notifying {} because the workflow operation has not changed", listener);
318       }
319     }
320   }
321 
322   /**
323    * {@inheritDoc}
324    *
325    * @see org.opencastproject.workflow.api.WorkflowService#listAvailableWorkflowDefinitions()
326    */
327   @Override
328   public List<WorkflowDefinition> listAvailableWorkflowDefinitions() {
329     return workflowDefinitionScanner
330             .getAvailableWorkflowDefinitions(securityService.getOrganization(), securityService.getUser())
331             .sorted()
332             .collect(Collectors.toList());
333   }
334 
335   /**
336    * {@inheritDoc}
337    */
338   public boolean isRunnable(WorkflowDefinition workflowDefinition) {
339     List<String> availableOperations = listAvailableOperationNames();
340     List<WorkflowDefinition> checkedWorkflows = new ArrayList<>();
341     boolean runnable = isRunnable(workflowDefinition, availableOperations, checkedWorkflows);
342     int wfCount = checkedWorkflows.size() - 1;
343     if (runnable)
344       logger.info("Workflow {}, containing {} derived workflows, is runnable", workflowDefinition, wfCount);
345     else
346       logger.warn("Workflow {}, containing {} derived workflows, is not runnable", workflowDefinition, wfCount);
347     return runnable;
348   }
349 
350   /**
351    * Tests the workflow definition for its runnability. This method is a helper for
352    * {@link #isRunnable(WorkflowDefinition)} that is suited for recursive calling.
353    *
354    * @param workflowDefinition
355    *          the definition to test
356    * @param availableOperations
357    *          list of currently available operation handlers
358    * @param checkedWorkflows
359    *          list of checked workflows, used to avoid circular checking
360    * @return <code>true</code> if all bits and pieces used for executing <code>workflowDefinition</code> are in place
361    */
362   private boolean isRunnable(WorkflowDefinition workflowDefinition, List<String> availableOperations,
363           List<WorkflowDefinition> checkedWorkflows) {
364     if (checkedWorkflows.contains(workflowDefinition))
365       return true;
366 
367     // Test availability of operation handler and catch workflows
368     for (WorkflowOperationDefinition op : workflowDefinition.getOperations()) {
369       if (!availableOperations.contains(op.getId())) {
370         logger.info("{} is not runnable due to missing operation {}", workflowDefinition, op);
371         return false;
372       }
373       String catchWorkflow = op.getExceptionHandlingWorkflow();
374       if (catchWorkflow != null) {
375         WorkflowDefinition catchWorkflowDefinition;
376         try {
377           catchWorkflowDefinition = getWorkflowDefinitionById(catchWorkflow);
378         } catch (NotFoundException e) {
379           logger.info("{} is not runnable due to missing catch workflow {} on operation {}", workflowDefinition,
380                   catchWorkflow, op);
381           return false;
382         }
383         if (!isRunnable(catchWorkflowDefinition, availableOperations, checkedWorkflows))
384           return false;
385       }
386     }
387 
388     // Add the workflow to the list of checked workflows
389     if (!checkedWorkflows.contains(workflowDefinition))
390       checkedWorkflows.add(workflowDefinition);
391     return true;
392   }
393 
394   /**
395    * Gets the currently registered workflow operation handlers.
396    *
397    * @return All currently registered handlers
398    */
399   public Set<HandlerRegistration> getRegisteredHandlers() {
400     Set<HandlerRegistration> set = new HashSet<>();
401     ServiceReference[] refs;
402     try {
403       refs = componentContext.getBundleContext().getServiceReferences(WorkflowOperationHandler.class.getName(), null);
404     } catch (InvalidSyntaxException e) {
405       throw new IllegalStateException(e);
406     }
407     if (refs != null) {
408       for (ServiceReference ref : refs) {
409         WorkflowOperationHandler handler = (WorkflowOperationHandler) componentContext.getBundleContext().getService(
410                 ref);
411         set.add(new HandlerRegistration((String) ref.getProperty(WORKFLOW_OPERATION_PROPERTY), handler));
412       }
413     } else {
414       logger.warn("No registered workflow operation handlers found");
415     }
416     return set;
417   }
418 
419   protected WorkflowOperationHandler getWorkflowOperationHandler(String operationId) {
420     for (HandlerRegistration reg : getRegisteredHandlers()) {
421       if (reg.operationName.equals(operationId))
422         return reg.handler;
423     }
424     return null;
425   }
426 
427   /**
428    * Lists the names of each workflow operation. Operation names are availalbe for use if there is a registered
429    * {@link WorkflowOperationHandler} with an equal {@link WorkflowServiceImpl#WORKFLOW_OPERATION_PROPERTY} property.
430    *
431    * @return The {@link List} of available workflow operation names
432    */
433   protected List<String> listAvailableOperationNames() {
434     return getRegisteredHandlers().parallelStream().map(op -> op.operationName).collect(Collectors.toList());
435   }
436 
437   /**
438    * {@inheritDoc}
439    *
440    * @see org.opencastproject.workflow.api.WorkflowService#getWorkflowById(long)
441    */
442   @Override
443   public WorkflowInstance getWorkflowById(long id) throws NotFoundException,
444           UnauthorizedException {
445     try {
446       WorkflowInstance workflow = persistence.getWorkflow(id);
447       assertPermission(workflow, Permissions.Action.READ.toString(), workflow.getOrganizationId());
448       return workflow;
449     } catch (WorkflowDatabaseException e) {
450       throw new IllegalStateException("Got not get workflow from database with id ");
451     }
452   }
453 
454   /**
455    * {@inheritDoc}
456    *
457    * @see org.opencastproject.workflow.api.WorkflowService#start(org.opencastproject.workflow.api.WorkflowDefinition,
458    *      org.opencastproject.mediapackage.MediaPackage)
459    */
460   @Override
461   public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage)
462           throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
463     return start(workflowDefinition, mediaPackage, new HashMap<>());
464   }
465 
466   /**
467    * {@inheritDoc}
468    *
469    * @see org.opencastproject.workflow.api.WorkflowService#start(org.opencastproject.workflow.api.WorkflowDefinition,
470    *      org.opencastproject.mediapackage.MediaPackage)
471    */
472   @Override
473   public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage,
474           Map<String, String> properties)
475           throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
476     try {
477       return start(workflowDefinition, mediaPackage, null, properties);
478     } catch (NotFoundException e) {
479       // should never happen
480       throw new IllegalStateException("a null workflow ID caused a NotFoundException.  This is a programming error.");
481     }
482   }
483 
484   /**
485    * {@inheritDoc}
486    *
487    * @see org.opencastproject.workflow.api.WorkflowService#start(org.opencastproject.workflow.api.WorkflowDefinition,
488    *      org.opencastproject.mediapackage.MediaPackage, Long, java.util.Map)
489    */
490   @Override
491   public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage sourceMediaPackage,
492           Long parentWorkflowId, Map<String, String> originalProperties) throws WorkflowDatabaseException,
493           NotFoundException, UnauthorizedException, WorkflowParsingException, IllegalStateException {
494     final String mediaPackageId = sourceMediaPackage.getIdentifier().toString();
495     Map<String, String> properties = null;
496 
497     // WorkflowPropertiesUtil.storeProperties will take a snapshot if there isn't one
498     // and we want the mp in the snapshot to have all the metadata populated.
499     populateMediaPackageMetadata(sourceMediaPackage);
500 
501     if (originalProperties != null) {
502       WorkflowPropertiesUtil.storeProperties(assetManager, sourceMediaPackage, originalProperties);
503       properties = WorkflowPropertiesUtil.getLatestWorkflowProperties(assetManager, mediaPackageId);
504     }
505 
506     // We have to synchronize per media package to avoid starting multiple simultaneous workflows for one media package.
507     final Lock lock = mediaPackageLocks.get(mediaPackageId);
508     lock.lock();
509 
510     try {
511       if (workflowDefinition == null)
512         throw new IllegalArgumentException("workflow definition must not be null");
513       for (List<String> errors : MediaPackageSupport.sanityCheck(sourceMediaPackage)) {
514         throw new IllegalArgumentException("Insane media package cannot be processed: " + String.join("; ", errors));
515       }
516       if (parentWorkflowId != null) {
517         try {
518           getWorkflowById(parentWorkflowId); // Let NotFoundException bubble up
519         } catch (UnauthorizedException e) {
520           throw new IllegalArgumentException("Parent workflow " + parentWorkflowId + " not visible to this user");
521         }
522       } else {
523         if (persistence.mediaPackageHasActiveWorkflows(mediaPackageId)) {
524           throw new IllegalStateException(String.format(
525                   "Can't start workflow '%s' for media package '%s' because another workflow is currently active.",
526                   workflowDefinition.getTitle(),
527                   sourceMediaPackage.getIdentifier().toString()));
528         }
529       }
530 
531       // Get the current user
532       User currentUser = securityService.getUser();
533       validUserOrThrow(currentUser);
534 
535       // Get the current organization
536       Organization organization = securityService.getOrganization();
537       if (organization == null)
538         throw new SecurityException("Current organization is unknown");
539 
540       WorkflowInstance workflowInstance = new WorkflowInstance(workflowDefinition, sourceMediaPackage,
541               currentUser, organization, properties);
542       workflowInstance = updateConfiguration(workflowInstance, properties);
543 
544       // Create and configure the workflow instance
545       try {
546         // Create a new job for this workflow instance
547         String workflowDefinitionXml = XmlWorkflowParser.toXml(workflowDefinition);
548         String mediaPackageXml = MediaPackageParser.getAsXml(sourceMediaPackage);
549 
550         List<String> arguments = new ArrayList<>();
551         arguments.add(workflowDefinitionXml);
552         arguments.add(mediaPackageXml);
553         if (parentWorkflowId != null || properties != null) {
554           String parentWorkflowIdString = (parentWorkflowId != null) ? parentWorkflowId.toString() : NULL_PARENT_ID;
555           arguments.add(parentWorkflowIdString);
556         }
557         if (properties != null) {
558           arguments.add(mapToString(properties));
559         }
560 
561         Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_WORKFLOW.toString(), arguments,
562                 null, false, null, WORKFLOW_JOB_LOAD);
563 
564         // Have the workflow take on the job's identity
565         workflowInstance.setId(job.getId());
566 
567         // Add the workflow to the search index and have the job enqueued for dispatch.
568         // Update also sets ACL and mediapackage metadata
569         update(workflowInstance);
570 
571         return workflowInstance;
572       } catch (Throwable t) {
573         try {
574           workflowInstance.setState(FAILED);
575           update(workflowInstance);
576         } catch (Exception failureToFail) {
577           logger.warn("Unable to update workflow to failed state", failureToFail);
578         }
579         try {
580           throw t;
581         } catch (ServiceRegistryException e) {
582           throw new WorkflowDatabaseException(e);
583         }
584       }
585     } finally {
586       lock.unlock();
587     }
588   }
589 
590   protected WorkflowInstance updateConfiguration(WorkflowInstance instance, Map<String, String> properties) {
591     if (properties != null) {
592       for (Entry<String, String> entry : properties.entrySet()) {
593         instance.setConfiguration(entry.getKey(), entry.getValue());
594       }
595     }
596     return instance;
597   }
598 
599   /**
600    * Does a lookup of available operation handlers for the given workflow operation.
601    *
602    * @param operation
603    *          the operation definition
604    * @return the handler or <code>null</code>
605    */
606   protected WorkflowOperationHandler selectOperationHandler(WorkflowOperationInstance operation) {
607     List<WorkflowOperationHandler> handlerList = new ArrayList<>();
608     for (HandlerRegistration handlerReg : getRegisteredHandlers()) {
609       if (handlerReg.operationName != null && handlerReg.operationName.equals(operation.getTemplate())) {
610         handlerList.add(handlerReg.handler);
611       }
612     }
613     if (handlerList.size() > 1) {
614       throw new IllegalStateException("Multiple operation handlers found for operation '" + operation.getTemplate()
615               + "'");
616     } else if (handlerList.size() == 1) {
617       return handlerList.get(0);
618     }
619     logger.warn("No workflow operation handlers found for operation '{}'", operation.getTemplate());
620     return null;
621   }
622 
623   /**
624    * Executes the workflow.
625    *
626    * @param workflow
627    *          the workflow instance
628    * @throws WorkflowException
629    *           if there is a problem processing the workflow
630    * @throws UnauthorizedException
631    */
632   protected Job runWorkflow(WorkflowInstance workflow) throws WorkflowException, UnauthorizedException {
633     if (INSTANTIATED != workflow.getState()) {
634 
635       // If the workflow is "running", we need to determine if there is an operation being executed or not.
636       // When a workflow has been restarted, this might not be the case and the status might not have been
637       // updated accordingly.
638       if (RUNNING == workflow.getState()) {
639         WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
640         if (currentOperation != null) {
641           if (currentOperation.getId() != null) {
642             try {
643               Job operationJob = serviceRegistry.getJob(currentOperation.getId());
644               if (Job.Status.RUNNING.equals(operationJob.getStatus())) {
645                 logger.debug("Not starting workflow {}, it is already in running state", workflow);
646                 return null;
647               } else {
648                 logger.info("Scheduling next operation of workflow {}", workflow);
649                 operationJob.setStatus(Status.QUEUED);
650                 operationJob.setDispatchable(true);
651                 return serviceRegistry.updateJob(operationJob);
652               }
653             } catch (Exception e) {
654               logger.warn("Error determining status of current workflow operation in {}", workflow, e);
655               return null;
656             }
657           }
658         } else {
659           throw new IllegalStateException("Cannot start a workflow '" + workflow + "' with no current operation");
660         }
661       } else {
662         throw new IllegalStateException("Cannot start a workflow in state '" + workflow.getState() + "'");
663       }
664     }
665 
666     // If this is a new workflow, move to the first operation
667     workflow.setState(RUNNING);
668     update(workflow);
669 
670     WorkflowOperationInstance operation = workflow.getCurrentOperation();
671 
672     if (operation == null) {
673       throw new IllegalStateException("Cannot start a workflow without a current operation");
674     }
675 
676     if (!operation.equals(workflow.getOperations().get(0))) {
677       throw new IllegalStateException("Current operation expected to be first");
678     }
679 
680     try {
681       logger.info("Scheduling workflow {} for execution", workflow.getId());
682       Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
683               Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
684       operation.setId(job.getId());
685       update(workflow);
686       job.setStatus(Status.QUEUED);
687       job.setDispatchable(true);
688       return serviceRegistry.updateJob(job);
689     } catch (ServiceRegistryException e) {
690       throw new WorkflowDatabaseException(e);
691     } catch (NotFoundException e) {
692       // this should be impossible
693       throw new IllegalStateException("Unable to find a job that was just created");
694     }
695 
696   }
697 
698   /**
699    * Executes the workflow's current operation.
700    *
701    * @param workflow
702    *          the workflow
703    * @param properties
704    *          the properties that are passed in on resume
705    * @return the processed workflow operation
706    * @throws WorkflowException
707    *           if there is a problem processing the workflow
708    * @throws UnauthorizedException
709    */
710   protected WorkflowOperationInstance runWorkflowOperation(WorkflowInstance workflow, Map<String, String> properties)
711           throws WorkflowException, UnauthorizedException {
712     WorkflowOperationInstance processingOperation = workflow.getCurrentOperation();
713     if (processingOperation == null)
714       throw new IllegalStateException("Workflow '" + workflow + "' has no operation to run");
715 
716     // Keep the current state for later reference, it might have been changed from the outside
717     WorkflowState initialState = workflow.getState();
718 
719     // Execute the operation handler
720     WorkflowOperationHandler operationHandler = selectOperationHandler(processingOperation);
721     WorkflowOperationWorker worker = new WorkflowOperationWorker(operationHandler, workflow, properties, this);
722     workflow = worker.execute();
723 
724     Long currentOperationJobId = processingOperation.getId();
725     try {
726       updateOperationJob(currentOperationJobId, processingOperation.getState());
727     } catch (NotFoundException e) {
728       throw new IllegalStateException("Unable to find a job that has already been running");
729     } catch (ServiceRegistryException e) {
730       throw new WorkflowDatabaseException(e);
731     }
732 
733     // Move on to the next workflow operation
734     WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
735 
736     // Is the workflow done?
737     if (currentOperation == null) {
738 
739       // If we are in failing mode, we were simply working off an error handling workflow
740       if (FAILING.equals(workflow.getState())) {
741         workflow.setState(FAILED);
742       }
743 
744       // Otherwise, let's make sure we didn't miss any failed operation, since the workflow state could have been
745       // switched to paused while processing the error handling workflow extension
746       else if (!FAILED.equals(workflow.getState())) {
747         workflow.setState(SUCCEEDED);
748         for (WorkflowOperationInstance op : workflow.getOperations()) {
749           if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
750             if (op.isFailOnError()) {
751               workflow.setState(FAILED);
752               break;
753             }
754           }
755         }
756       }
757 
758       // Save the updated workflow to the database
759       logger.debug("{} has {}", workflow, workflow.getState());
760       update(workflow);
761 
762     } else {
763 
764       // Somebody might have set the workflow to "paused" from the outside, so take a look a the database first
765       WorkflowState dbWorkflowState;
766       try {
767         dbWorkflowState = getWorkflowById(workflow.getId()).getState();
768       } catch (NotFoundException e) {
769         throw new IllegalStateException("The workflow with ID " + workflow.getId()
770                 + " can not be found in the database", e);
771       } catch (UnauthorizedException e) {
772         throw new IllegalStateException("The workflow with ID " + workflow.getId() + " can not be read", e);
773       }
774 
775       // If somebody changed the workflow state from the outside, that state should take precedence
776       if (!dbWorkflowState.equals(initialState)) {
777         logger.info("Workflow state for {} was changed to '{}' from the outside", workflow, dbWorkflowState);
778         workflow.setState(dbWorkflowState);
779       }
780 
781       // Save the updated workflow to the database
782 
783       Job job;
784       switch (workflow.getState()) {
785         case FAILED:
786           update(workflow);
787           break;
788         case FAILING:
789         case RUNNING:
790           try {
791             job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
792                     Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
793             currentOperation.setId(job.getId());
794             update(workflow);
795             job.setStatus(Status.QUEUED);
796             job.setDispatchable(true);
797             serviceRegistry.updateJob(job);
798           } catch (ServiceRegistryException e) {
799             throw new WorkflowDatabaseException(e);
800           } catch (NotFoundException e) {
801             // this should be impossible
802             throw new IllegalStateException("Unable to find a job that was just created");
803           }
804           break;
805         case PAUSED:
806         case STOPPED:
807         case SUCCEEDED:
808           update(workflow);
809           break;
810         case INSTANTIATED:
811           update(workflow);
812           throw new IllegalStateException("Impossible workflow state found during processing");
813         default:
814           throw new IllegalStateException("Unknown workflow state found during processing");
815       }
816 
817     }
818 
819     return processingOperation;
820   }
821 
822   /**
823    * {@inheritDoc}
824    *
825    * @see org.opencastproject.workflow.api.WorkflowService#getWorkflowDefinitionById(String)
826    */
827   @Override
828   public WorkflowDefinition getWorkflowDefinitionById(String id) throws NotFoundException {
829     final WorkflowIdentifier workflowIdentifier = new WorkflowIdentifier(id, securityService.getOrganization().getId());
830     final WorkflowDefinition def = workflowDefinitionScanner
831             .getWorkflowDefinition(securityService.getUser(), workflowIdentifier);
832     if (def == null) {
833       throw new NotFoundException("Workflow definition '" + workflowIdentifier + "' not found or inaccessible");
834     }
835     return def;
836   }
837 
838   /**
839    * {@inheritDoc}
840    *
841    * @see org.opencastproject.workflow.api.WorkflowService#stop(long)
842    */
843   @Override
844   public WorkflowInstance stop(long workflowInstanceId) throws WorkflowException, NotFoundException,
845           UnauthorizedException {
846     final Lock lock = this.lock.get(workflowInstanceId);
847     lock.lock();
848     try {
849       WorkflowInstance instance = getWorkflowById(workflowInstanceId);
850 
851       if (instance.getState() != STOPPED) {
852         // Update the workflow instance
853         instance.setState(STOPPED);
854         update(instance);
855       }
856 
857       try {
858         removeTempFiles(instance);
859       } catch (Exception e) {
860         logger.warn("Cannot remove temp files for workflow instance {}", workflowInstanceId, e);
861       }
862 
863       return instance;
864     } finally {
865       lock.unlock();
866     }
867   }
868 
869   /**
870    * Checks whether user is set and is known to the userDirectoryService
871    */
872   private void validUserOrThrow(User user) {
873       if (user == null)
874         throw new SecurityException("Current user is unknown");
875 
876       if (userDirectoryService.loadUser(user.getUsername()) == null)
877         throw new SecurityException(String.format("Current user '%s' can not be loaded", user.getUsername()));
878   }
879 
880   private void removeTempFiles(WorkflowInstance workflowInstance) {
881     logger.info("Removing temporary files for workflow {}", workflowInstance.getId());
882     MediaPackage mp = workflowInstance.getMediaPackage();
883     if (null == mp) {
884       logger.warn("Workflow instance {} does not have an media package set", workflowInstance.getId());
885       return;
886     }
887     for (MediaPackageElement elem : mp.getElements()) {
888       // Publications should not link to temporary files and can be skipped
889       if (elem instanceof Publication) {
890         continue;
891       }
892       if (null == elem.getURI()) {
893         logger.warn("Media package element {} from the media package {} does not have an URI set",
894                 elem.getIdentifier(), mp.getIdentifier());
895         continue;
896       }
897       try {
898         logger.debug("Removing temporary file {} for workflow {}", elem.getURI(), workflowInstance);
899         workspace.delete(elem.getURI());
900       } catch (IOException e) {
901         logger.warn("Unable to delete mediapackage element", e);
902       } catch (NotFoundException e) {
903         // File was probably already deleted before...
904       }
905     }
906   }
907 
908 
909   /**
910    * {@inheritDoc}
911    *
912    * @see org.opencastproject.workflow.api.WorkflowService#remove(long)
913    */
914   @Override
915   public void remove(long workflowInstanceId) throws WorkflowDatabaseException, NotFoundException,
916           UnauthorizedException, WorkflowParsingException, WorkflowStateException {
917     remove(workflowInstanceId, false);
918   }
919 
920   /**
921    * {@inheritDoc}
922    *
923    * @see org.opencastproject.workflow.api.WorkflowService#remove(long,boolean)
924    */
925   @Override
926   public void remove(long workflowInstanceId, boolean force) throws WorkflowDatabaseException, NotFoundException,
927           UnauthorizedException, WorkflowStateException {
928     final Lock lock = this.lock.get(workflowInstanceId);
929     lock.lock();
930     try {
931       WorkflowInstance instance = getWorkflowById(workflowInstanceId);
932       WorkflowInstance.WorkflowState state = instance.getState();
933       if (!state.isTerminated() && !force) {
934         throw new WorkflowStateException("Workflow instance with state '" + state + "' cannot be removed "
935             + "since it is not yet terminated.");
936       }
937 
938       assertPermission(instance, Permissions.Action.WRITE.toString(), instance.getOrganizationId());
939 
940       // First, remove temporary files
941       removeTempFiles(instance);
942 
943       // Second, remove jobs related to operations which belong to the workflow instance
944       List<WorkflowOperationInstance> operations = instance.getOperations();
945       List<Long> jobsToDelete = new ArrayList<>();
946       for (WorkflowOperationInstance op : operations) {
947         if (op.getId() != null) {
948           long workflowOpId = op.getId();
949           if (workflowOpId != workflowInstanceId) {
950             jobsToDelete.add(workflowOpId);
951           }
952         }
953       }
954       try {
955         serviceRegistry.removeJobs(jobsToDelete);
956       } catch (ServiceRegistryException e) {
957         logger.warn("Problems while removing jobs related to workflow operations '{}'", jobsToDelete, e);
958       } catch (NotFoundException e) {
959         logger.debug("No jobs related to one of the workflow operation '{}' found in service registry", jobsToDelete);
960       }
961 
962       // Third, remove workflow instance job itself
963       try {
964         serviceRegistry.removeJobs(Collections.singletonList(workflowInstanceId));
965         removeWorkflowInstanceFromIndex(instance.getId());
966       } catch (ServiceRegistryException e) {
967         logger.warn("Problems while removing workflow instance job '{}'", workflowInstanceId, e);
968       } catch (NotFoundException e) {
969         logger.info("No workflow instance job '{}' found in the service registry", workflowInstanceId);
970       }
971 
972       // Remove workflow from database
973       persistence.removeFromDatabase(instance);
974     } finally {
975       lock.unlock();
976     }
977   }
978 
979   /**
980    * {@inheritDoc}
981    *
982    * @see org.opencastproject.workflow.api.WorkflowService#suspend(long)
983    */
984   @Override
985   public WorkflowInstance suspend(long workflowInstanceId) throws WorkflowException, NotFoundException,
986           UnauthorizedException {
987     final Lock lock = this.lock.get(workflowInstanceId);
988     lock.lock();
989     try {
990       WorkflowInstance instance = getWorkflowById(workflowInstanceId);
991       instance.setState(PAUSED);
992       update(instance);
993       return instance;
994     } finally {
995       lock.unlock();
996     }
997   }
998 
999   /**
1000    * {@inheritDoc}
1001    *
1002    * @see org.opencastproject.workflow.api.WorkflowService#resume(long)
1003    */
1004   @Override
1005   public WorkflowInstance resume(long id) throws WorkflowException, NotFoundException, IllegalStateException,
1006           UnauthorizedException {
1007     return resume(id, null);
1008   }
1009 
1010   /**
1011    * {@inheritDoc}
1012    *
1013    * @see org.opencastproject.workflow.api.WorkflowService#resume(long, Map)
1014    */
1015   @Override
1016   public WorkflowInstance resume(long workflowInstanceId, Map<String, String> properties) throws WorkflowException,
1017           NotFoundException, IllegalStateException, UnauthorizedException {
1018     WorkflowInstance workflowInstance = getWorkflowById(workflowInstanceId);
1019     if (!WorkflowState.PAUSED.equals(workflowInstance.getState()))
1020       throw new IllegalStateException("Can not resume a workflow where the current state is not in paused");
1021 
1022     workflowInstance = updateConfiguration(workflowInstance, properties);
1023     update(workflowInstance);
1024 
1025     WorkflowOperationInstance currentOperation = workflowInstance.getCurrentOperation();
1026 
1027     // Is the workflow done?
1028     if (currentOperation == null) {
1029       // Let's make sure we didn't miss any failed operation, since the workflow state could have been
1030       // switched to paused while processing the error handling workflow extension
1031       workflowInstance.setState(SUCCEEDED);
1032       for (WorkflowOperationInstance op : workflowInstance.getOperations()) {
1033         if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
1034           if (op.isFailOnError()) {
1035             workflowInstance.setState(FAILED);
1036             break;
1037           }
1038         }
1039       }
1040 
1041       // Save the resumed workflow to the database
1042       logger.debug("{} has {}", workflowInstance, workflowInstance.getState());
1043       update(workflowInstance);
1044       return workflowInstance;
1045     }
1046 
1047     // We can resume workflows when they are in either the paused state, or they are being advanced manually passed
1048     // certain operations. In the latter case, there is no current paused operation.
1049     if (OperationState.INSTANTIATED.equals(currentOperation.getState())) {
1050       try {
1051         // the operation has its own job. Update that too.
1052         Job operationJob = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
1053                 Collections.singletonList(Long.toString(workflowInstanceId)), null, false, null, WORKFLOW_JOB_LOAD);
1054 
1055         // this method call is publicly visible, so it doesn't necessarily go through the accept method. Set the
1056         // workflow state manually.
1057         workflowInstance.setState(RUNNING);
1058         currentOperation.setId(operationJob.getId());
1059 
1060         // update the workflow and its associated job
1061         update(workflowInstance);
1062 
1063         // Now set this job to be queued so it can be dispatched
1064         operationJob.setStatus(Status.QUEUED);
1065         operationJob.setDispatchable(true);
1066         serviceRegistry.updateJob(operationJob);
1067 
1068         return workflowInstance;
1069       } catch (ServiceRegistryException e) {
1070         throw new WorkflowDatabaseException(e);
1071       }
1072     }
1073 
1074     Long operationJobId = workflowInstance.getCurrentOperation().getId();
1075     if (operationJobId == null)
1076       throw new IllegalStateException("Can not resume a workflow where the current operation has no associated id");
1077 
1078     // Set the current operation's job to queued, so it gets picked up again
1079     Job workflowJob;
1080     try {
1081       workflowJob = serviceRegistry.getJob(workflowInstanceId);
1082       workflowJob.setStatus(Status.RUNNING);
1083       persistence.updateInDatabase(workflowInstance);
1084       serviceRegistry.updateJob(workflowJob);
1085 
1086       Job operationJob = serviceRegistry.getJob(operationJobId);
1087       operationJob.setStatus(Status.QUEUED);
1088       operationJob.setDispatchable(true);
1089       if (properties != null) {
1090         Properties props = new Properties();
1091         props.putAll(properties);
1092         ByteArrayOutputStream out = new ByteArrayOutputStream();
1093         props.store(out, null);
1094         List<String> newArguments = new ArrayList<String>(operationJob.getArguments());
1095         newArguments.add(new String(out.toByteArray(), StandardCharsets.UTF_8));
1096         operationJob.setArguments(newArguments);
1097       }
1098       serviceRegistry.updateJob(operationJob);
1099     } catch (ServiceRegistryException e) {
1100       throw new WorkflowDatabaseException(e);
1101     } catch (IOException e) {
1102       throw new WorkflowParsingException("Unable to parse workflow and/or workflow properties");
1103     }
1104 
1105     return workflowInstance;
1106   }
1107 
1108   /**
1109    * Asserts that the current user has permission to take the provided action on a workflow instance.
1110    *
1111    * @param workflow
1112    *          the workflow instance
1113    * @param action
1114    *          the action to ensure is permitted
1115    * @throws UnauthorizedException
1116    *           if the action is not authorized
1117    */
1118   protected void assertPermission(WorkflowInstance workflow, String action, String workflowOrgId) throws UnauthorizedException {
1119     User currentUser = securityService.getUser();
1120     Organization currentOrg = securityService.getOrganization();
1121     String currentOrgAdminRole = currentOrg.getAdminRole();
1122     String currentOrgId = currentOrg.getId();
1123 
1124     MediaPackage mediapackage = workflow.getMediaPackage();
1125 
1126     WorkflowState state = workflow.getState();
1127     if (state != INSTANTIATED && state != RUNNING && workflow.getState() != FAILING) {
1128       Optional<MediaPackage> assetMediapackage = assetManager.getMediaPackage(mediapackage.getIdentifier().toString());
1129       if (assetMediapackage.isPresent()) {
1130         mediapackage = assetMediapackage.get();
1131       }
1132     }
1133 
1134     var creatorName = workflow.getCreatorName();
1135     var workflowCreator = creatorName == null ? null : userDirectoryService.loadUser(creatorName);
1136     boolean authorized = currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1137             || (currentUser.hasRole(currentOrgAdminRole) && currentOrgId.equals(workflowOrgId))
1138             || (currentUser.equals(workflowCreator))
1139             || (authorizationService.hasPermission(mediapackage, action) && currentOrgId.equals(workflowOrgId));
1140 
1141     if (!authorized) {
1142       throw new UnauthorizedException(currentUser, action);
1143     }
1144   }
1145 
1146   protected boolean assertMediaPackagePermission(String mediaPackageId, String action) {
1147     var currentUser = securityService.getUser();
1148     Optional<MediaPackage> mp = assetManager.getMediaPackage(mediaPackageId);
1149 
1150     // asset manager already checks if user is admin, org admin for same org as mp, or has explicit read rights
1151     // global admins can still get workflow instances if mp is gone from asset manager
1152     // org admins can't because then we don't know if mp belonged to same org as user
1153     return currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1154             || mp.isPresent() && currentUser.hasRole(securityService.getOrganization().getAdminRole())
1155             || mp.isPresent() && authorizationService.hasPermission(mp.get(), action);
1156   }
1157 
1158   /**
1159    * {@inheritDoc}
1160    *
1161    * @see org.opencastproject.workflow.api.WorkflowService#update(org.opencastproject.workflow.api.WorkflowInstance)
1162    */
1163   @Override
1164   public void update(final WorkflowInstance workflowInstance) throws WorkflowDatabaseException, UnauthorizedException {
1165     final Lock lock = updateLock.get(workflowInstance.getId());
1166     lock.lock();
1167 
1168     try {
1169       WorkflowInstance originalWorkflowInstance = null;
1170       try {
1171         // get workflow and assert permissions
1172         originalWorkflowInstance = getWorkflowById(workflowInstance.getId());
1173       } catch (NotFoundException e) {
1174         // That's fine, it's a new workflow instance
1175       }
1176 
1177       MediaPackage updatedMediaPackage = null;
1178       try {
1179 
1180         // Before we persist this, extract the metadata
1181         updatedMediaPackage = workflowInstance.getMediaPackage();
1182 
1183         populateMediaPackageMetadata(updatedMediaPackage);
1184 
1185         String seriesId = updatedMediaPackage.getSeries();
1186         if (seriesId != null && workflowInstance.getCurrentOperation() != null) {
1187           // If the mediapackage contains a series, find the series ACLs and add the security information to the
1188           // mediapackage
1189 
1190           try {
1191             AccessControlList acl = seriesService.getSeriesAccessControl(seriesId);
1192             Tuple<AccessControlList, AclScope> activeAcl = authorizationService.getAcl(
1193                 updatedMediaPackage, AclScope.Series);
1194             // Update series ACL if it differs from the active series ACL on the media package
1195             if (!AclScope.Series.equals(activeAcl.getB()) || !AccessControlUtil.equals(activeAcl.getA(), acl)) {
1196               authorizationService.setAcl(updatedMediaPackage, AclScope.Series, acl);
1197             }
1198           } catch (NotFoundException e) {
1199             logger.debug("Not updating series ACL on event {} since series {} has no ACL set",
1200                 updatedMediaPackage, seriesId, e);
1201           }
1202         }
1203 
1204         workflowInstance.setMediaPackage(updatedMediaPackage);
1205       } catch (SeriesException e) {
1206         throw new WorkflowDatabaseException(e);
1207       } catch (Exception e) {
1208         logger.error("Metadata for media package {} could not be updated", updatedMediaPackage, e);
1209       }
1210 
1211       // Synchronize the job status with the workflow
1212       WorkflowState workflowState = workflowInstance.getState();
1213 
1214       Job job;
1215       try {
1216         job = serviceRegistry.getJob(workflowInstance.getId());
1217         job.setPayload(Long.toString(workflowInstance.getId()));
1218 
1219         // Synchronize workflow and job state
1220         switch (workflowState) {
1221           case FAILED:
1222             job.setStatus(Status.FAILED);
1223             break;
1224           case FAILING:
1225             break;
1226           case INSTANTIATED:
1227             job.setDispatchable(true);
1228             job.setStatus(Status.QUEUED);
1229             break;
1230           case PAUSED:
1231             job.setStatus(Status.PAUSED);
1232             break;
1233           case RUNNING:
1234             job.setStatus(Status.RUNNING);
1235             break;
1236           case STOPPED:
1237             job.setStatus(Status.CANCELLED);
1238             break;
1239           case SUCCEEDED:
1240             job.setStatus(Status.FINISHED);
1241             break;
1242           default:
1243             throw new IllegalStateException("Found a workflow state that is not handled");
1244         }
1245       } catch (ServiceRegistryException e) {
1246         throw new WorkflowDatabaseException(
1247             "Unable to read workflow job " + workflowInstance.getId() + " from service registry", e);
1248       } catch (NotFoundException e) {
1249         throw new WorkflowDatabaseException(
1250             "Job for workflow " + workflowInstance.getId() + " not found in service registry", e);
1251       }
1252 
1253       // Update both workflow and workflow job
1254       try {
1255         //Update the database
1256         persistence.updateInDatabase(workflowInstance);
1257 
1258         job = serviceRegistry.updateJob(job);
1259 
1260         WorkflowOperationInstance op = workflowInstance.getCurrentOperation();
1261 
1262         // Update index used for UI. Note that we only need certain metadata and we can safely filter out workflow
1263         // updates for running operations since we updated the metadata right before these operations and will do so
1264         // again right after those operations.
1265         if (op == null || op.getState() != OperationState.RUNNING) {
1266           // Collect necessary information only for index update
1267           long id = workflowInstance.getId();
1268           int state = workflowInstance.getState().ordinal();
1269           var template = workflowInstance.getTemplate();
1270           String mpId = workflowInstance.getMediaPackage().getIdentifier().toString();
1271           String orgId = workflowInstance.getOrganizationId();
1272 
1273           updateWorkflowInstanceInIndex(id, state, template, mpId, orgId);
1274         }
1275       } catch (ServiceRegistryException e) {
1276         throw new WorkflowDatabaseException("Update of workflow job " + workflowInstance.getId()
1277             + " in the service registry failed, service registry and workflow table may be out of sync", e);
1278       } catch (NotFoundException e) {
1279         throw new WorkflowDatabaseException("Job for workflow " + workflowInstance.getId()
1280             + " not found in service registry", e);
1281       } catch (Exception e) {
1282         throw new WorkflowDatabaseException("Update of workflow job " + job.getId() + " in the service registry failed, "
1283             + "service registry and workflow table may be out of sync", e);
1284       }
1285 
1286       try {
1287         fireListeners(originalWorkflowInstance, workflowInstance);
1288       } catch (Exception e) {
1289         // Can't happen, since we are converting from an in-memory object
1290         throw new IllegalStateException("In-memory workflow instance could not be serialized", e);
1291       }
1292     } finally {
1293       lock.unlock();
1294     }
1295   }
1296 
1297   /**
1298    * {@inheritDoc}
1299    *
1300    * @see org.opencastproject.workflow.api.WorkflowService#countWorkflowInstances()
1301    */
1302   @Override
1303   public long countWorkflowInstances() throws WorkflowDatabaseException {
1304     return countWorkflowInstances(null);
1305   }
1306 
1307   /**
1308    * {@inheritDoc}
1309    *
1310    * @see org.opencastproject.workflow.api.WorkflowService#countWorkflowInstances(org.opencastproject.workflow.api.WorkflowInstance.WorkflowState)
1311    */
1312   @Override
1313   public long countWorkflowInstances(WorkflowState state) throws WorkflowDatabaseException {
1314     return persistence.countWorkflows(state);
1315   }
1316 
1317   /**
1318    * {@inheritDoc}
1319    *
1320    * @see org.opencastproject.workflow.api.WorkflowService#getWorkflowInstancesByMediaPackage(String)
1321    */
1322   @Override
1323   public List<WorkflowInstance> getWorkflowInstancesByMediaPackage(String mediaPackageId)
1324       throws WorkflowDatabaseException, UnauthorizedException {
1325     // If we have read permission to the media package, return all workflows
1326     if (!assertMediaPackagePermission(mediaPackageId, Permissions.Action.READ.toString())) {
1327       throw new UnauthorizedException("Not allowed to access event");
1328     }
1329     return persistence.getWorkflowInstancesByMediaPackage(mediaPackageId);
1330   }
1331 
1332   /**
1333    * {@inheritDoc}
1334    *
1335    * @see org.opencastproject.workflow.api.WorkflowService#getRunningWorkflowInstanceByMediaPackage(String, String)
1336    */
1337   @Override
1338   public Optional<WorkflowInstance> getRunningWorkflowInstanceByMediaPackage(String mediaPackageId, String action)
1339           throws WorkflowException, UnauthorizedException, WorkflowDatabaseException {
1340     List<WorkflowInstance> workflowInstances = persistence.getRunningWorkflowInstancesByMediaPackage(mediaPackageId);
1341 
1342     // If there is more than workflow running something is very wrong
1343     if (workflowInstances.size() > 1) {
1344       throw new WorkflowException("Multiple workflows are active on mediapackage " + mediaPackageId);
1345     }
1346 
1347     Optional<WorkflowInstance> optWorkflowInstance = Optional.empty();
1348     if (workflowInstances.size() == 1) {
1349       WorkflowInstance wfInstance = workflowInstances.get(0);
1350       optWorkflowInstance = Optional.of(wfInstance);
1351       assertPermission(wfInstance, action, wfInstance.getOrganizationId());
1352     }
1353 
1354     return optWorkflowInstance;
1355   }
1356 
1357   /**
1358    * {@inheritDoc}
1359    *
1360    * @see org.opencastproject.workflow.api.WorkflowService#mediaPackageHasActiveWorkflows(String)
1361    */
1362   @Override
1363   public boolean mediaPackageHasActiveWorkflows(String mediaPackageId) throws WorkflowDatabaseException {
1364     return persistence.mediaPackageHasActiveWorkflows(mediaPackageId);
1365   }
1366 
1367   /**
1368    * {@inheritDoc}
1369    *
1370    * @see org.opencastproject.workflow.api.WorkflowService#userHasActiveWorkflows(String)
1371    */
1372   @Override
1373   public boolean userHasActiveWorkflows(String userId) throws WorkflowDatabaseException {
1374     return persistence.userHasActiveWorkflows(userId);
1375   }
1376 
1377   /**
1378    * Callback for workflow operations that were throwing an exception. This implementation assumes that the operation
1379    * worker has already adjusted the current operation's state appropriately.
1380    *
1381    * @param workflow
1382    *          the workflow instance
1383    * @param currentOperation
1384    *          the current workflow operation
1385    * @return the workflow instance
1386    */
1387   protected WorkflowInstance handleOperationException(
1388       WorkflowInstance workflow,
1389       WorkflowOperationInstance currentOperation) {
1390     int failedAttempt = currentOperation.getFailedAttempts() + 1;
1391     currentOperation.setFailedAttempts(failedAttempt);
1392 
1393     // Operation was aborted by the user, after going into hold state
1394     if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate())
1395             && OperationState.FAILED.equals(currentOperation.getState())) {
1396       int position = workflow.getOperations().indexOf(currentOperation);
1397       // Advance to operation that actually failed
1398       if (workflow.getOperations().size() > position + 1) { // This should always be true...
1399         currentOperation = workflow.getOperations().get(position + 1);
1400         // It's currently in RETRY state, change to FAILED
1401         currentOperation.setState(OperationState.FAILED);
1402       }
1403       handleFailedOperation(workflow, currentOperation);
1404     } else if (currentOperation.getMaxAttempts() != -1 && failedAttempt == currentOperation.getMaxAttempts()) {
1405       handleFailedOperation(workflow, currentOperation);
1406     } else {
1407       switch (currentOperation.getRetryStrategy()) {
1408         case NONE:
1409           handleFailedOperation(workflow, currentOperation);
1410           break;
1411         case RETRY:
1412           currentOperation.setState(OperationState.RETRY);
1413           break;
1414         case HOLD:
1415           currentOperation.setState(OperationState.RETRY);
1416           List<WorkflowOperationInstance> operations = workflow.getOperations();
1417           WorkflowOperationDefinitionImpl errorResolutionDefinition = new WorkflowOperationDefinitionImpl(
1418                   ERROR_RESOLUTION_HANDLER_ID, "Error Resolution Operation", "error", false);
1419           var errorResolutionInstance = new WorkflowOperationInstance(errorResolutionDefinition);
1420           errorResolutionInstance.setExceptionHandlingWorkflow(currentOperation.getExceptionHandlingWorkflow());
1421           var index = workflow.getOperations().indexOf(currentOperation);
1422           operations.add(index, errorResolutionInstance);
1423           workflow.setOperations(operations);
1424           break;
1425         default:
1426           break;
1427       }
1428     }
1429     return workflow;
1430   }
1431 
1432   /**
1433    * Handles the workflow for a failing operation.
1434    *
1435    * @param workflow
1436    *          the workflow
1437    * @param currentOperation
1438    *          the failing workflow operation instance
1439    */
1440   private void handleFailedOperation(WorkflowInstance workflow, WorkflowOperationInstance currentOperation) {
1441     String errorDefId = currentOperation.getExceptionHandlingWorkflow();
1442 
1443     // Adjust the workflow state according to the setting on the operation
1444     if (currentOperation.isFailOnError()) {
1445       if (StringUtils.isBlank(errorDefId)) {
1446         workflow.setState(FAILED);
1447       } else {
1448         workflow.setState(FAILING);
1449 
1450         // Remove the rest of the original workflow
1451         int currentOperationPosition = workflow.getOperations().indexOf(currentOperation);
1452         List<WorkflowOperationInstance> operations = new ArrayList<>(
1453                 workflow.getOperations().subList(0, currentOperationPosition + 1));
1454         workflow.setOperations(operations);
1455 
1456         // Determine the current workflow configuration
1457         Map<String, String> configuration = new HashMap<>();
1458         for (String configKey : workflow.getConfigurationKeys()) {
1459           configuration.put(configKey, workflow.getConfiguration(configKey));
1460         }
1461 
1462         // Append the operations
1463         WorkflowDefinition errorDef = null;
1464         try {
1465           errorDef = getWorkflowDefinitionById(errorDefId);
1466           workflow.extend(errorDef);
1467           workflow.setOperations(updateConfiguration(workflow, configuration).getOperations());
1468         } catch (NotFoundException notFoundException) {
1469           throw new IllegalStateException("Unable to find the error workflow definition '" + errorDefId + "'");
1470         }
1471       }
1472     }
1473 
1474     // Fail the current operation
1475     currentOperation.setState(OperationState.FAILED);
1476   }
1477 
1478   /**
1479    * Callback for workflow operation handlers that executed and finished without exception. This implementation assumes
1480    * that the operation worker has already adjusted the current operation's state appropriately.
1481    *
1482    * @param workflow
1483    *          the workflow instance
1484    * @param result
1485    *          the workflow operation result
1486    * @return the workflow instance
1487    * @throws WorkflowDatabaseException
1488    *           if updating the workflow fails
1489    */
1490   protected WorkflowInstance handleOperationResult(WorkflowInstance workflow, WorkflowOperationResult result)
1491           throws WorkflowDatabaseException {
1492 
1493     // Get the operation and its handler
1494     WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
1495     WorkflowOperationHandler handler = getWorkflowOperationHandler(currentOperation.getTemplate());
1496 
1497     // Create an operation result for the lazy or else update the workflow's media package
1498     if (result == null) {
1499       logger.warn("Handling a null operation result for workflow {} in operation {}", workflow.getId(),
1500               currentOperation.getTemplate());
1501       result = new WorkflowOperationResultImpl(workflow.getMediaPackage(), null, Action.CONTINUE, 0);
1502     } else {
1503       MediaPackage mp = result.getMediaPackage();
1504       if (mp != null) {
1505         workflow.setMediaPackage(mp);
1506       }
1507     }
1508 
1509     // The action to take
1510     Action action = result.getAction();
1511 
1512     // Update the workflow configuration.
1513     workflow = updateConfiguration(workflow, result.getProperties());
1514 
1515     // Adjust workflow statistics
1516     currentOperation.setTimeInQueue(result.getTimeInQueue());
1517 
1518     // Adjust the operation state
1519     switch (action) {
1520       case CONTINUE:
1521         currentOperation.setState(OperationState.SUCCEEDED);
1522         break;
1523       case PAUSE:
1524         if (!(handler instanceof ResumableWorkflowOperationHandler)) {
1525           throw new IllegalStateException("Operation " + currentOperation.getTemplate() + " is not resumable");
1526         }
1527 
1528         // Set abortable and continuable to default values
1529         currentOperation.setContinuable(result.allowsContinue());
1530         currentOperation.setAbortable(result.allowsAbort());
1531 
1532         workflow.setState(PAUSED);
1533         currentOperation.setState(OperationState.PAUSED);
1534         break;
1535       case SKIP:
1536         currentOperation.setState(OperationState.SKIPPED);
1537         break;
1538       default:
1539         throw new IllegalStateException("Unknown action '" + action + "' returned");
1540     }
1541 
1542     if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate()) && result.getAction() == Action.CONTINUE) {
1543 
1544       Map<String, String> resultProperties = result.getProperties();
1545       if (resultProperties == null || StringUtils.isBlank(resultProperties.get(RETRY_STRATEGY)))
1546         throw new WorkflowDatabaseException("Retry strategy not present in properties!");
1547 
1548       RetryStrategy retryStrategy = RetryStrategy.valueOf(resultProperties.get(RETRY_STRATEGY));
1549       switch (retryStrategy) {
1550         case NONE:
1551           handleFailedOperation(workflow, workflow.getCurrentOperation());
1552           break;
1553         case RETRY:
1554           break;
1555         default:
1556           throw new WorkflowDatabaseException("Retry strategy not implemented yet!");
1557       }
1558     }
1559 
1560     return workflow;
1561   }
1562 
1563   /**
1564    * Reads the available metadata from the dublin core catalog (if there is one) and updates the mediapackage.
1565    *
1566    * @param mp
1567    *          the media package
1568    */
1569   protected void populateMediaPackageMetadata(MediaPackage mp) {
1570     if (metadataServices.isEmpty()) {
1571       logger.warn("No metadata services are registered, so no media package metadata can be extracted from catalogs");
1572       return;
1573     }
1574     for (MediaPackageMetadataService metadataService : metadataServices) {
1575       MediaPackageMetadata metadata = metadataService.getMetadata(mp);
1576       MediaPackageMetadataSupport.populateMediaPackageMetadata(mp, metadata);
1577     }
1578   }
1579 
1580   /**
1581    * {@inheritDoc}
1582    *
1583    * @see org.opencastproject.job.api.JobProducer#isReadyToAcceptJobs(String)
1584    */
1585   @Override
1586   public boolean isReadyToAcceptJobs(String operation) {
1587     return true;
1588   }
1589 
1590   /**
1591    * {@inheritDoc}
1592    *
1593    * If we are already running the maximum number of workflows, don't accept another START_WORKFLOW job
1594    *
1595    * @see org.opencastproject.job.api.AbstractJobProducer#isReadyToAccept(org.opencastproject.job.api.Job)
1596    */
1597   @Override
1598   public boolean isReadyToAccept(Job job) throws UndispatchableJobException {
1599     String operation = job.getOperation();
1600 
1601     // Only restrict execution of new jobs
1602     if (!Operation.START_WORKFLOW.toString().equals(operation))
1603       return true;
1604 
1605     // If the first operation is guaranteed to pause, run the job.
1606     if (job.getArguments().size() > 1 && job.getArguments().get(0) != null) {
1607       try {
1608         WorkflowDefinition workflowDef = XmlWorkflowParser.parseWorkflowDefinition(job.getArguments().get(0));
1609         if (workflowDef.getOperations().size() > 0) {
1610           String firstOperationId = workflowDef.getOperations().get(0).getId();
1611           WorkflowOperationHandler handler = getWorkflowOperationHandler(firstOperationId);
1612           if (handler instanceof ResumableWorkflowOperationHandler) {
1613             if (((ResumableWorkflowOperationHandler) handler).isAlwaysPause()) {
1614               return true;
1615             }
1616           }
1617         }
1618       } catch (WorkflowParsingException e) {
1619         throw new UndispatchableJobException(job + " is not a proper job to start a workflow", e);
1620       }
1621     }
1622 
1623     WorkflowInstance workflow;
1624     Optional<WorkflowInstance> workflowInstance;
1625     String mediaPackageId;
1626 
1627     // Fetch all workflows that are running with the current media package
1628     try {
1629       workflow = getWorkflowById(job.getId());
1630       mediaPackageId = workflow.getMediaPackage().getIdentifier().toString();
1631     } catch (NotFoundException e) {
1632       throw new UndispatchableJobException("Trying to start workflow with job id " + job.getId()
1633           + " but no corresponding instance is available from the workflow service", e);
1634     } catch (UnauthorizedException e) {
1635       throw new UndispatchableJobException(
1636           "Authorization denied while requesting to loading workflow instance. Job: " + job.getId(), e);
1637     }
1638 
1639     try {
1640       workflowInstance = getRunningWorkflowInstanceByMediaPackage(
1641               workflow.getMediaPackage().getIdentifier().toString(), Permissions.Action.READ.toString());
1642     } catch (UnauthorizedException e) {
1643       throw new UndispatchableJobException("Authorization denied while requesting to loading workflow instance " + workflow.getId(), e);
1644     } catch (WorkflowDatabaseException e) {
1645       throw new UndispatchableJobException("An database error occurred while checking if a workflow is already active "
1646           + "(job: " + job.getId() + ")", e);
1647     } catch (WorkflowException e) {
1648       // Avoid running multiple workflows with same media package id at the same time
1649       delayWorkflow(workflow, mediaPackageId);
1650       return false;
1651     }
1652 
1653     // Make sure we are not excluding ourselves
1654     if (workflowInstance.isPresent() && workflow.getId() != workflowInstance.get().getId()) {
1655       delayWorkflow(workflow, mediaPackageId);
1656       return false;
1657     }
1658 
1659     return true;
1660   }
1661 
1662   private void delayWorkflow(WorkflowInstance workflow, String mediaPackageId) {
1663     if (!delayedWorkflows.contains(workflow.getId())) {
1664       logger.info("Delaying start of workflow {}, another workflow on media package {} is still running",
1665               workflow.getId(), mediaPackageId);
1666       delayedWorkflows.add(workflow.getId());
1667     }
1668   }
1669 
1670   /**
1671    * {@inheritDoc}
1672    *
1673    * @see org.opencastproject.job.api.AbstractJobProducer#acceptJob(org.opencastproject.job.api.Job)
1674    */
1675   @Override
1676   public synchronized void acceptJob(Job job) throws ServiceRegistryException {
1677     User originalUser = securityService.getUser();
1678     Organization originalOrg = securityService.getOrganization();
1679     try {
1680       Organization organization = organizationDirectoryService.getOrganization(job.getOrganization());
1681       securityService.setOrganization(organization);
1682       User user = userDirectoryService.loadUser(job.getCreator());
1683       securityService.setUser(user);
1684       job.setStatus(Job.Status.RUNNING);
1685       job = serviceRegistry.updateJob(job);
1686 
1687       // Check if this workflow was initially delayed
1688       if (delayedWorkflows.contains(job.getId())) {
1689         delayedWorkflows.remove(job.getId());
1690         logger.info("Starting initially delayed workflow {}, {} more waiting", job.getId(), delayedWorkflows.size());
1691       }
1692 
1693       executorService.submit(new JobRunner(job, serviceRegistry.getCurrentJob()));
1694     } catch (Exception e) {
1695       if (e instanceof ServiceRegistryException)
1696         throw (ServiceRegistryException) e;
1697       throw new ServiceRegistryException(e);
1698     } finally {
1699       securityService.setUser(originalUser);
1700       securityService.setOrganization(originalOrg);
1701     }
1702   }
1703 
1704   /**
1705    * Processes the workflow job.
1706    *
1707    * @param job
1708    *          the job
1709    * @return the job payload
1710    * @throws Exception
1711    *           if job processing fails
1712    */
1713   protected String process(Job job) throws Exception {
1714     List<String> arguments = job.getArguments();
1715     Operation op = null;
1716     WorkflowInstance workflowInstance = null;
1717     WorkflowOperationInstance wfo;
1718     String operation = job.getOperation();
1719     try {
1720       try {
1721         op = Operation.valueOf(operation);
1722         switch (op) {
1723           case START_WORKFLOW:
1724             workflowInstance = persistence.getWorkflow(Long.parseLong(job.getPayload()));
1725             logger.debug("Starting new workflow {}", workflowInstance);
1726             runWorkflow(workflowInstance);
1727             break;
1728           case RESUME:
1729             workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1730             Map<String, String> properties = null;
1731             if (arguments.size() > 1) {
1732               Properties props = new Properties();
1733               props.load(IOUtils.toInputStream(arguments.get(arguments.size() - 1), StandardCharsets.UTF_8));
1734               properties = new HashMap<>();
1735               for (Entry<Object, Object> entry : props.entrySet()) {
1736                 properties.put(entry.getKey().toString(), entry.getValue().toString());
1737               }
1738             }
1739             logger.debug("Resuming {} at {}", workflowInstance, workflowInstance.getCurrentOperation());
1740             workflowInstance.setState(RUNNING);
1741             update(workflowInstance);
1742             runWorkflowOperation(workflowInstance, properties);
1743             break;
1744           case START_OPERATION:
1745             workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1746             wfo = workflowInstance.getCurrentOperation();
1747 
1748             if (OperationState.RUNNING.equals(wfo.getState()) || OperationState.PAUSED.equals(wfo.getState())) {
1749               logger.info("Reset operation state {} {} to INSTANTIATED due to job restart", workflowInstance, wfo);
1750               wfo.setState(OperationState.INSTANTIATED);
1751             }
1752 
1753             wfo.setExecutionHost(job.getProcessingHost());
1754             logger.debug("Running {} {}", workflowInstance, wfo);
1755             wfo = runWorkflowOperation(workflowInstance, null);
1756             updateOperationJob(job.getId(), wfo.getState());
1757             break;
1758           default:
1759             throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
1760         }
1761       } catch (IllegalArgumentException e) {
1762         throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
1763       } catch (IndexOutOfBoundsException e) {
1764         throw new ServiceRegistryException(
1765             "The argument list for operation '" + op + "' (job: " + job.getId() + ") does not meet expectations", e);
1766       } catch (NotFoundException e) {
1767         logger.warn("Not found processing job {}", job, e);
1768         updateOperationJob(job.getId(), OperationState.FAILED);
1769       }
1770       return null;
1771     } catch (Exception e) {
1772       logger.warn("Exception while accepting job {}", job, e);
1773       try {
1774         if (workflowInstance != null) {
1775           logger.warn("Marking job {} and workflow instance {} as failed", job, workflowInstance);
1776           updateOperationJob(job.getId(), OperationState.FAILED);
1777           workflowInstance.setState(FAILED);
1778           update(workflowInstance);
1779         } else {
1780           logger.warn("Unable to parse workflow instance", e);
1781         }
1782       } catch (WorkflowDatabaseException e1) {
1783         throw new ServiceRegistryException(e1);
1784       }
1785       if (e instanceof ServiceRegistryException)
1786         throw e;
1787       throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
1788     }
1789   }
1790 
1791   /**
1792    * Synchronizes the workflow operation's job with the operation status if the operation has a job associated with it,
1793    * which is determined by looking at the operation's job id.
1794    *
1795    * @param state
1796    *          the operation state
1797    * @param jobId
1798    *          the associated job
1799    * @return the updated job or <code>null</code> if there is no job for this operation
1800    * @throws ServiceRegistryException
1801    *           if the job can't be updated in the service registry
1802    * @throws NotFoundException
1803    *           if the job can't be found
1804    */
1805   private Job updateOperationJob(Long jobId, OperationState state) throws NotFoundException, ServiceRegistryException {
1806     if (jobId == null)
1807       return null;
1808     Job job = serviceRegistry.getJob(jobId);
1809     switch (state) {
1810       case FAILED:
1811       case RETRY:
1812         job.setStatus(Status.FAILED);
1813         break;
1814       case PAUSED:
1815         job.setStatus(Status.PAUSED);
1816         job.setOperation(Operation.RESUME.toString());
1817         break;
1818       case SKIPPED:
1819       case SUCCEEDED:
1820         job.setStatus(Status.FINISHED);
1821         break;
1822       default:
1823         throw new IllegalStateException("Unexpected state '" + state + "' found");
1824     }
1825     return serviceRegistry.updateJob(job);
1826   }
1827 
1828   /**
1829    * {@inheritDoc}
1830    *
1831    * @see org.opencastproject.job.api.JobProducer#countJobs(org.opencastproject.job.api.Job.Status)
1832    */
1833   @Override
1834   public long countJobs(Status status) throws ServiceRegistryException {
1835     return serviceRegistry.count(JOB_TYPE, status);
1836   }
1837 
1838   /**
1839    * Converts a Map<String, String> to s key=value\n string, suitable for the properties form parameter expected by the
1840    * workflow rest endpoint.
1841    *
1842    * @param props
1843    *          The map of strings
1844    * @return the string representation
1845    */
1846   private String mapToString(Map<String, String> props) {
1847     if (props == null)
1848       return null;
1849     StringBuilder sb = new StringBuilder();
1850     for (Entry<String, String> entry : props.entrySet()) {
1851       sb.append(entry.getKey());
1852       sb.append("=");
1853       sb.append(entry.getValue());
1854       sb.append("\n");
1855     }
1856     return sb.toString();
1857   }
1858 
1859   /**
1860    * Dummy callback for osgi
1861    *
1862    * @param unused
1863    *          the unused ReadinessIndicator
1864    */
1865   @Reference(target = "(artifact=workflowdefinition)")
1866   protected void setProfilesReadyIndicator(ReadinessIndicator unused) { }
1867 
1868   /**
1869    * Callback for the OSGi environment to register with the <code>Workspace</code>.
1870    *
1871    * @param workspace
1872    *          the workspace
1873    */
1874   @Reference
1875   protected void setWorkspace(Workspace workspace) {
1876     this.workspace = workspace;
1877   }
1878 
1879   /**
1880    * Callback for the OSGi environment to register with the <code>ServiceRegistry</code>.
1881    *
1882    * @param registry
1883    *          the service registry
1884    */
1885   @Reference
1886   protected void setServiceRegistry(ServiceRegistry registry) {
1887     this.serviceRegistry = registry;
1888   }
1889 
1890   public ServiceRegistry getServiceRegistry() {
1891     return serviceRegistry;
1892   }
1893 
1894   /**
1895    * Callback for setting the security service.
1896    *
1897    * @param securityService
1898    *          the securityService to set
1899    */
1900   @Reference
1901   public void setSecurityService(SecurityService securityService) {
1902     this.securityService = securityService;
1903   }
1904 
1905   /**
1906    * Callback for setting the authorization service.
1907    *
1908    * @param authorizationService
1909    *          the authorizationService to set
1910    */
1911   @Reference
1912   public void setAuthorizationService(AuthorizationService authorizationService) {
1913     this.authorizationService = authorizationService;
1914   }
1915 
1916   /**
1917    * Callback for setting the user directory service
1918    *
1919    * @param userDirectoryService
1920    *          the userDirectoryService to set
1921    */
1922   @Reference
1923   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1924     this.userDirectoryService = userDirectoryService;
1925   }
1926 
1927   /**
1928    * Sets a reference to the organization directory service.
1929    *
1930    * @param organizationDirectory
1931    *          the organization directory
1932    */
1933   @Reference
1934   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
1935     this.organizationDirectoryService = organizationDirectory;
1936   }
1937 
1938   /**
1939    * Sets the series service
1940    *
1941    * @param seriesService
1942    *          the seriesService to set
1943    */
1944   @Reference
1945   public void setSeriesService(SeriesService seriesService) {
1946     this.seriesService = seriesService;
1947   }
1948 
1949   /**
1950    * Sets the asset manager
1951    *
1952    * @param assetManager
1953    *          the assetManager to set
1954    */
1955   @Reference
1956   public void setAssetManager(AssetManager assetManager) {
1957     this.assetManager = assetManager;
1958   }
1959 
1960   /**
1961    * Callback to set the metadata service
1962    *
1963    * @param service
1964    *          the metadata service
1965    */
1966   @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC, unbind = "removeMetadataService")
1967   protected void addMetadataService(MediaPackageMetadataService service) {
1968     metadataServices.add(service);
1969   }
1970 
1971   /**
1972    * Callback to remove a mediapackage metadata service.
1973    *
1974    * @param service
1975    *          the mediapackage metadata service to remove
1976    */
1977   protected void removeMetadataService(MediaPackageMetadataService service) {
1978     metadataServices.remove(service);
1979   }
1980 
1981   /**
1982    * Callback to set the workflow definition scanner
1983    *
1984    * @param scanner
1985    *          the workflow definition scanner
1986    */
1987   @Reference
1988   protected void addWorkflowDefinitionScanner(WorkflowDefinitionScanner scanner) {
1989     workflowDefinitionScanner = scanner;
1990   }
1991 
1992   /**
1993    * Callback to set the Admin UI index.
1994    *
1995    * @param index
1996    *          the admin UI index.
1997    */
1998   @Reference
1999   public void setIndex(ElasticsearchIndex index) {
2000     this.index = index;
2001   }
2002 
2003   /**
2004    * Callback to set the workflow database
2005    *
2006    * @param persistence
2007    *          the workflow database
2008    */
2009   @Reference(name = "workflow-persistence")
2010   public void setPersistence(WorkflowServiceDatabase persistence) {
2011     this.persistence = persistence;
2012   }
2013 
2014   /**
2015    * {@inheritDoc}
2016    *
2017    * @see org.opencastproject.job.api.JobProducer#getJobType()
2018    */
2019   @Override
2020   public String getJobType() {
2021     return JOB_TYPE;
2022   }
2023 
2024   /**
2025    * A tuple of a workflow operation handler and the name of the operation it handles
2026    */
2027   public static class HandlerRegistration {
2028 
2029     protected WorkflowOperationHandler handler;
2030     protected String operationName;
2031 
2032     public HandlerRegistration(String operationName, WorkflowOperationHandler handler) {
2033       if (operationName == null)
2034         throw new IllegalArgumentException("Operation name cannot be null");
2035       if (handler == null)
2036         throw new IllegalArgumentException("Handler cannot be null");
2037       this.operationName = operationName;
2038       this.handler = handler;
2039     }
2040 
2041     public WorkflowOperationHandler getHandler() {
2042       return handler;
2043     }
2044 
2045     /**
2046      * {@inheritDoc}
2047      *
2048      * @see java.lang.Object#hashCode()
2049      */
2050     @Override
2051     public int hashCode() {
2052       final int prime = 31;
2053       int result = 1;
2054       result = prime * result + handler.hashCode();
2055       result = prime * result + operationName.hashCode();
2056       return result;
2057     }
2058 
2059     /**
2060      * {@inheritDoc}
2061      *
2062      * @see java.lang.Object#equals(java.lang.Object)
2063      */
2064     @Override
2065     public boolean equals(Object obj) {
2066       if (this == obj)
2067         return true;
2068       if (obj == null)
2069         return false;
2070       if (getClass() != obj.getClass())
2071         return false;
2072       HandlerRegistration other = (HandlerRegistration) obj;
2073       if (!handler.equals(other.handler))
2074         return false;
2075       return operationName.equals(other.operationName);
2076     }
2077   }
2078 
2079   /**
2080    * A utility class to run jobs
2081    */
2082   class JobRunner implements Callable<Void> {
2083 
2084     /** The job */
2085     private Job job = null;
2086 
2087     /** The current job */
2088     private final Job currentJob;
2089 
2090     /**
2091      * Constructs a new job runner
2092      *
2093      * @param job
2094      *          the job to run
2095      * @param currentJob
2096      *          the current running job
2097      */
2098     JobRunner(Job job, Job currentJob) {
2099       this.job = job;
2100       this.currentJob = currentJob;
2101     }
2102 
2103     /**
2104      * {@inheritDoc}
2105      *
2106      * @see java.util.concurrent.Callable#call()
2107      */
2108     @Override
2109     public Void call() throws Exception {
2110       Organization jobOrganization = organizationDirectoryService.getOrganization(job.getOrganization());
2111       try {
2112         serviceRegistry.setCurrentJob(currentJob);
2113         securityService.setOrganization(jobOrganization);
2114         User jobUser = userDirectoryService.loadUser(job.getCreator());
2115         securityService.setUser(jobUser);
2116         process(job);
2117       } finally {
2118         serviceRegistry.setCurrentJob(null);
2119         securityService.setUser(null);
2120         securityService.setOrganization(null);
2121       }
2122       return null;
2123     }
2124   }
2125 
2126   @Override
2127   public synchronized void cleanupWorkflowInstances(int buffer, WorkflowState state) throws UnauthorizedException,
2128           WorkflowDatabaseException {
2129     logger.info("Start cleaning up workflow instances older than {} days with status '{}'", buffer, state);
2130 
2131     int instancesCleaned = 0;
2132     int cleaningFailed = 0;
2133 
2134     Date priorTo = DateUtils.addDays(new Date(), -buffer);
2135 
2136     try {
2137       for (WorkflowInstance workflowInstance : persistence.getWorkflowInstancesForCleanup(state, priorTo)) {
2138         try {
2139           logger.debug("Deleting workflow instance {}", workflowInstance.getId());
2140           remove(workflowInstance.getId());
2141           instancesCleaned++;
2142         } catch (WorkflowDatabaseException | UnauthorizedException e) {
2143           throw e;
2144         } catch (NotFoundException e) {
2145           // Since we are in a cleanup operation, we don't have to care about NotFoundExceptions
2146           logger.debug("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2147         } catch (WorkflowParsingException | WorkflowStateException e) {
2148           logger.warn("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2149           cleaningFailed++;
2150         }
2151       }
2152     } catch (WorkflowDatabaseException e) {
2153       throw new WorkflowDatabaseException(e);
2154     }
2155 
2156     if (instancesCleaned == 0 && cleaningFailed == 0) {
2157       logger.info("No workflow instances found to clean up");
2158       return;
2159     }
2160 
2161     if (instancesCleaned > 0)
2162       logger.info("Cleaned up '{}' workflow instances", instancesCleaned);
2163     if (cleaningFailed > 0) {
2164       logger.warn("Cleaning failed for '{}' workflow instances", cleaningFailed);
2165       throw new WorkflowDatabaseException("Unable to clean all workflow instances, see logs!");
2166     }
2167   }
2168 
2169   @Override
2170   public Map<String, Map<String, String>> getWorkflowStateMappings() {
2171     return workflowDefinitionScanner.workflowStateMappings.entrySet().stream().collect(Collectors.toMap(
2172             Entry::getKey, e -> e.getValue().stream()
2173                     .collect(Collectors.toMap(m -> m.getState().name(), WorkflowStateMapping::getValue))
2174     ));
2175   }
2176 
2177   @Override
2178   public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
2179     try {
2180       final int total;
2181       try {
2182         total = persistence.countMediaPackages();
2183       } catch (WorkflowDatabaseException e) {
2184         logIndexRebuildError(logger, e);
2185         throw new IndexRebuildException(getService(), e);
2186       }
2187 
2188       if (total > 0) {
2189         logIndexRebuildBegin(logger, total, "workflows");
2190         int current = 0;
2191         int n = 20;
2192         List<WorkflowIndexData> workflowIndexData;
2193 
2194         int limit = 1000;
2195         int offset = 0;
2196         String currentMediapackageId;
2197         String lastMediapackageId = "";
2198         var updatedWorkflowRange = new ArrayList<Event>();
2199         do {
2200           try {
2201             workflowIndexData = persistence.getWorkflowIndexData(limit, offset);
2202           } catch (WorkflowDatabaseException e) {
2203             logIndexRebuildError(logger, e);
2204             throw new IndexRebuildException(getService(), e);
2205           }
2206           if (!workflowIndexData.isEmpty()) {
2207             offset += limit;
2208             logger.debug("Got {} workflows for re-indexing", workflowIndexData.size());
2209 
2210             for (WorkflowIndexData indexData : workflowIndexData) {
2211               currentMediapackageId = indexData.getMediaPackageId();
2212               if (currentMediapackageId.equals(lastMediapackageId)) {
2213                 continue;
2214               }
2215               current++;
2216 
2217               // Include PAUSED; otherwise, paused workflows will show up as "Finished"
2218               if (!WorkflowUtil.isActive(WorkflowInstance.WorkflowState.values()[indexData.getState()].toString())
2219                       || WorkflowState.PAUSED == WorkflowInstance.WorkflowState.values()[indexData.getState()]) {
2220                 String orgid = indexData.getOrganizationId();
2221                 if (null == orgid) {
2222                   String mpId = indexData.getMediaPackageId();
2223                   //We're assuming here that mediapackages don't change orgs
2224                   RichAResult results = assetManager.getSnapshotsById(mpId);
2225                   if (results.getSize() == 0) {
2226                     logger.debug("Dropping {} from the index since it is missing from the database", mpId);
2227                     continue;
2228                   }
2229                   orgid = results.getSnapshots().stream().findFirst().get().getOrganizationId();
2230                   //We try-catch here since it's possible for the WF to exist in the *index* but not in the *DB*
2231                   // It probably shouldn't be, but that won't keep it from happening anyway.
2232                   try {
2233                     //NB: This version of getWorkflow takes the org id, which in this case is null
2234                     // Using the normal version filters by org, and since this workflow has a NULL org it can't be found
2235                     WorkflowInstance instance = persistence.getWorkflow(indexData.getId(), null);
2236                     instance.setOrganizationId(orgid);
2237                     persistence.updateInDatabase(instance);
2238                   } catch (NotFoundException e) {
2239                     //Technically this should never happen, but getWorkflow throws it.
2240                   }
2241                 }
2242                 var updatedWorkflowData = index.getEvent(indexData.getMediaPackageId(), orgid, securityService.getUser());
2243                 updatedWorkflowData = getStateUpdateFunction(indexData).apply(updatedWorkflowData);
2244                 updatedWorkflowRange.add(updatedWorkflowData.get());
2245 
2246                 if (updatedWorkflowRange.size() >= n || current >= total) {
2247                   index.bulkEventUpdate(updatedWorkflowRange);
2248                   logIndexRebuildProgress(logger, total, current, n);
2249                   updatedWorkflowRange.clear();
2250                 }
2251               }
2252               else {
2253                 logger.info("Skipping. Workflow {} is currently active.", indexData.getId());
2254               }
2255 
2256               lastMediapackageId = currentMediapackageId;
2257             }
2258           }
2259         } while (!workflowIndexData.isEmpty());
2260       }
2261     } catch (Exception e) {
2262       logIndexRebuildError(logger, e);
2263       throw new IndexRebuildException(getService(), e);
2264     }
2265   }
2266 
2267   @Override
2268   public IndexRebuildService.Service getService() {
2269     return IndexRebuildService.Service.Workflow;
2270   }
2271 
2272   /**
2273    * Remove a workflow instance from the Elasticsearch index.
2274    *
2275    * @param workflowInstanceId
2276    *         the identifier of the workflow instance to remove
2277    * @param index
2278    *         the index to update
2279    */
2280   private void removeWorkflowInstanceFromIndex(long workflowInstanceId) {
2281     final String orgId = securityService.getOrganization().getId();
2282     final User user = securityService.getUser();
2283 
2284     // find events
2285     SearchResult<Event> results;
2286     try {
2287       results = index.getByQuery(new EventSearchQuery(orgId, user).withWorkflowId(workflowInstanceId));
2288     } catch (SearchIndexException e) {
2289       logger.error("Error retrieving the events for workflow instance {} from the {} index.", workflowInstanceId,
2290               index.getIndexName(), e);
2291       return;
2292     }
2293 
2294     if (results.getItems().length == 0) {
2295       logger.warn("No events for workflow instance {} found in the {} index.", workflowInstanceId,
2296               index.getIndexName());
2297       return;
2298     }
2299 
2300     // should be only one event, but better safe than sorry
2301     for (SearchResultItem<Event> item: results.getItems()) {
2302       String eventId = item.getSource().getIdentifier();
2303       logger.debug("Removing workflow instance {} of event {} from the {} index.", workflowInstanceId, eventId,
2304               index.getIndexName());
2305 
2306       Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
2307         if (!eventOpt.isPresent()) {
2308           logger.warn("Event {} of workflow instance {} not found in the {} index.", workflowInstanceId,
2309                   eventId, index.getIndexName());
2310           return Optional.empty();
2311         }
2312         Event event = eventOpt.get();
2313         if (event.getWorkflowId() != null && event.getWorkflowId().equals(workflowInstanceId)) {
2314           logger.debug("Workflow {} is the current workflow of event {}. Removing it from event.", eventId,
2315                   workflowInstanceId);
2316           event.setWorkflowId(null);
2317           event.setWorkflowDefinitionId(null);
2318           event.setWorkflowState(null);
2319           return Optional.of(event);
2320         }
2321         return Optional.empty();
2322       };
2323 
2324       try {
2325         index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
2326         logger.debug("Workflow instance {} of event {} removed from the {} index.", workflowInstanceId, eventId,
2327                 index.getIndexName());
2328       } catch (SearchIndexException e) {
2329         logger.error("Error removing the workflow instance {} of event {} from the {} index.", workflowInstanceId,
2330                 eventId, index.getIndexName(), e);
2331       }
2332     }
2333   }
2334 
2335   /**
2336    * Update a workflow instance in the Elasticsearch index.
2337    *
2338    * @param id
2339    *         workflow id
2340    * @param state
2341    *         workflow state as int
2342    * @param mpId
2343    *         corresponding mediapackage id
2344    * @param orgId
2345    *         workflow organization id
2346    */
2347   private void updateWorkflowInstanceInIndex(long id, int state, String wfDefId, String mpId, String orgId) {
2348     final WorkflowState workflowState = WorkflowState.values()[state];
2349     final User user = securityService.getUser();
2350 
2351     logger.debug("Updating workflow instance {} of event {} in the {} index.", id, mpId,
2352             index.getIndexName());
2353     Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
2354       Event event = eventOpt.orElse(new Event(mpId, orgId));
2355       event.setWorkflowId(id);
2356       event.setWorkflowState(workflowState);
2357       event.setWorkflowDefinitionId(wfDefId);
2358       return Optional.of(event);
2359     };
2360 
2361     try {
2362       index.addOrUpdateEvent(mpId, updateFunction, orgId, user);
2363       logger.debug("Workflow instance {} of event {} updated in the {} index.", id, mpId,
2364               index.getIndexName());
2365     } catch (SearchIndexException e) {
2366       logger.error("Error updating the workflow instance {} of event {} in the {} index.", id, mpId,
2367               index.getIndexName(), e);
2368     }
2369   }
2370 
2371   /**
2372    * Get the function to update the workflow state for an event in the Elasticsearch index.
2373    *
2374    * @param wfData
2375    *          The workflow index data package
2376    * @return the function to do the update
2377    */
2378   private Function<Optional<Event>, Optional<Event>> getStateUpdateFunction(WorkflowIndexData wfData) {
2379     return (Optional<Event> eventOpt) -> {
2380       Event event = eventOpt.orElse(new Event(wfData.getMediaPackageId(), wfData.getOrganizationId()));
2381       event.setWorkflowId(wfData.getId());
2382       event.setWorkflowState(WorkflowState.values()[wfData.getState()]);
2383       return Optional.of(event);
2384     };
2385   }
2386 }