View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.impl;
23  
24  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
25  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILED;
26  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILING;
27  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.INSTANTIATED;
28  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.PAUSED;
29  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.RUNNING;
30  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.STOPPED;
31  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.SUCCEEDED;
32  
33  import org.opencastproject.assetmanager.api.AssetManager;
34  import org.opencastproject.assetmanager.api.query.RichAResult;
35  import org.opencastproject.assetmanager.util.WorkflowPropertiesUtil;
36  import org.opencastproject.elasticsearch.api.SearchIndexException;
37  import org.opencastproject.elasticsearch.api.SearchResult;
38  import org.opencastproject.elasticsearch.api.SearchResultItem;
39  import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
40  import org.opencastproject.elasticsearch.index.objects.event.Event;
41  import org.opencastproject.elasticsearch.index.objects.event.EventSearchQuery;
42  import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
43  import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
44  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
45  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
46  import org.opencastproject.job.api.Job;
47  import org.opencastproject.job.api.Job.Status;
48  import org.opencastproject.job.api.JobProducer;
49  import org.opencastproject.mediapackage.MediaPackage;
50  import org.opencastproject.mediapackage.MediaPackageElement;
51  import org.opencastproject.mediapackage.MediaPackageParser;
52  import org.opencastproject.mediapackage.MediaPackageSupport;
53  import org.opencastproject.mediapackage.Publication;
54  import org.opencastproject.metadata.api.MediaPackageMetadata;
55  import org.opencastproject.metadata.api.MediaPackageMetadataService;
56  import org.opencastproject.metadata.api.MetadataService;
57  import org.opencastproject.metadata.api.util.MediaPackageMetadataSupport;
58  import org.opencastproject.security.api.AccessControlList;
59  import org.opencastproject.security.api.AccessControlUtil;
60  import org.opencastproject.security.api.AclScope;
61  import org.opencastproject.security.api.AuthorizationService;
62  import org.opencastproject.security.api.Organization;
63  import org.opencastproject.security.api.OrganizationDirectoryService;
64  import org.opencastproject.security.api.Permissions;
65  import org.opencastproject.security.api.SecurityService;
66  import org.opencastproject.security.api.UnauthorizedException;
67  import org.opencastproject.security.api.User;
68  import org.opencastproject.security.api.UserDirectoryService;
69  import org.opencastproject.series.api.SeriesException;
70  import org.opencastproject.series.api.SeriesService;
71  import org.opencastproject.serviceregistry.api.ServiceRegistry;
72  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
73  import org.opencastproject.serviceregistry.api.UndispatchableJobException;
74  import org.opencastproject.util.NotFoundException;
75  import org.opencastproject.util.ReadinessIndicator;
76  import org.opencastproject.util.data.Tuple;
77  import org.opencastproject.workflow.api.ResumableWorkflowOperationHandler;
78  import org.opencastproject.workflow.api.RetryStrategy;
79  import org.opencastproject.workflow.api.WorkflowDatabaseException;
80  import org.opencastproject.workflow.api.WorkflowDefinition;
81  import org.opencastproject.workflow.api.WorkflowException;
82  import org.opencastproject.workflow.api.WorkflowIdentifier;
83  import org.opencastproject.workflow.api.WorkflowIndexData;
84  import org.opencastproject.workflow.api.WorkflowInstance;
85  import org.opencastproject.workflow.api.WorkflowInstance.WorkflowState;
86  import org.opencastproject.workflow.api.WorkflowListener;
87  import org.opencastproject.workflow.api.WorkflowOperationDefinition;
88  import org.opencastproject.workflow.api.WorkflowOperationDefinitionImpl;
89  import org.opencastproject.workflow.api.WorkflowOperationHandler;
90  import org.opencastproject.workflow.api.WorkflowOperationInstance;
91  import org.opencastproject.workflow.api.WorkflowOperationInstance.OperationState;
92  import org.opencastproject.workflow.api.WorkflowOperationResult;
93  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
94  import org.opencastproject.workflow.api.WorkflowOperationResultImpl;
95  import org.opencastproject.workflow.api.WorkflowParsingException;
96  import org.opencastproject.workflow.api.WorkflowService;
97  import org.opencastproject.workflow.api.WorkflowServiceDatabase;
98  import org.opencastproject.workflow.api.WorkflowStateException;
99  import org.opencastproject.workflow.api.WorkflowStateMapping;
100 import org.opencastproject.workflow.api.WorkflowUtil;
101 import org.opencastproject.workflow.api.XmlWorkflowParser;
102 import org.opencastproject.workspace.api.Workspace;
103 
104 import com.google.common.util.concurrent.Striped;
105 
106 import org.apache.commons.io.IOUtils;
107 import org.apache.commons.lang3.StringUtils;
108 import org.apache.commons.lang3.time.DateUtils;
109 import org.osgi.framework.InvalidSyntaxException;
110 import org.osgi.framework.ServiceReference;
111 import org.osgi.service.component.ComponentContext;
112 import org.osgi.service.component.annotations.Activate;
113 import org.osgi.service.component.annotations.Component;
114 import org.osgi.service.component.annotations.Reference;
115 import org.osgi.service.component.annotations.ReferenceCardinality;
116 import org.osgi.service.component.annotations.ReferencePolicy;
117 import org.slf4j.Logger;
118 import org.slf4j.LoggerFactory;
119 
120 import java.io.ByteArrayOutputStream;
121 import java.io.IOException;
122 import java.nio.charset.StandardCharsets;
123 import java.util.ArrayList;
124 import java.util.Collections;
125 import java.util.Comparator;
126 import java.util.Date;
127 import java.util.HashMap;
128 import java.util.HashSet;
129 import java.util.List;
130 import java.util.Map;
131 import java.util.Map.Entry;
132 import java.util.Optional;
133 import java.util.Properties;
134 import java.util.Set;
135 import java.util.SortedSet;
136 import java.util.TreeSet;
137 import java.util.concurrent.Callable;
138 import java.util.concurrent.CopyOnWriteArrayList;
139 import java.util.concurrent.Executors;
140 import java.util.concurrent.ThreadPoolExecutor;
141 import java.util.concurrent.locks.Lock;
142 import java.util.function.Function;
143 import java.util.stream.Collectors;
144 
145 /**
146  * Implements WorkflowService with in-memory data structures to hold WorkflowOperations and WorkflowInstances.
147  * WorkflowOperationHandlers are looked up in the OSGi service registry based on the "workflow.operation" property. If
148  * the WorkflowOperationHandler's "workflow.operation" service registration property matches
149  * WorkflowOperation.getName(), then the factory returns a WorkflowOperationRunner to handle that operation. This allows
150  * for custom runners to be added or modified without affecting the workflow service itself.
151  */
152 @Component(
153     property = {
154       "service.description=Workflow Service",
155       "service.pid=org.opencastproject.workflow.impl.WorkflowServiceImpl"
156     },
157     immediate = true,
158     service = { WorkflowService.class, WorkflowServiceImpl.class, IndexProducer.class }
159 )
160 public class WorkflowServiceImpl extends AbstractIndexProducer implements WorkflowService, JobProducer {
161 
162   /** Retry strategy property name */
163   private static final String RETRY_STRATEGY = "retryStrategy";
164 
165   /** Logging facility */
166   private static final Logger logger = LoggerFactory.getLogger(WorkflowServiceImpl.class);
167 
168   /** List of available operations on jobs */
169   enum Operation {
170     START_WORKFLOW, RESUME, START_OPERATION
171   }
172 
173   /** Constant value indicating a <code>null</code> parent id */
174   private static final String NULL_PARENT_ID = "-";
175 
176   /** The load imposed on the system by a workflow job.
177    *  We are keeping this hardcoded because otherwise bad things will likely happen,
178    *  like an inability to process a workflow past a certain point in high-load conditions
179    */
180   private static final float WORKFLOW_JOB_LOAD = 0.0f;
181 
182   /** Error resolution handler id constant */
183   public static final String ERROR_RESOLUTION_HANDLER_ID = "error-resolution";
184 
185   /** Remove references to the component context once felix scr 1.2 becomes available */
186   protected ComponentContext componentContext = null;
187 
188   /** The metadata services */
189   private SortedSet<MediaPackageMetadataService> metadataServices;
190 
191   /** Persistent storage */
192   protected WorkflowServiceDatabase persistence;
193 
194   /** The list of workflow listeners */
195   private final List<WorkflowListener> listeners = new CopyOnWriteArrayList<WorkflowListener>();
196 
197   /** The thread pool to use for firing listeners and handling dispatched jobs */
198   protected ThreadPoolExecutor executorService;
199 
200   /** The workspace */
201   protected Workspace workspace = null;
202 
203   /** The service registry */
204   protected ServiceRegistry serviceRegistry = null;
205 
206   /** The security service */
207   protected SecurityService securityService = null;
208 
209   /** The authorization service */
210   protected AuthorizationService authorizationService = null;
211 
212   /** The user directory service */
213   protected UserDirectoryService userDirectoryService = null;
214 
215   /** The organization directory service */
216   protected OrganizationDirectoryService organizationDirectoryService = null;
217 
218   /** The series service */
219   protected SeriesService seriesService;
220 
221   /** The asset manager */
222   protected AssetManager assetManager = null;
223 
224   /** The workflow definition scanner */
225   private WorkflowDefinitionScanner workflowDefinitionScanner;
226 
227   /** List of initially delayed workflows */
228   private final List<Long> delayedWorkflows = new ArrayList<Long>();
229 
230   /** Striped locks for synchronization */
231   private final Striped<Lock> lock = Striped.lazyWeakLock(1024);
232   private final Striped<Lock> updateLock = Striped.lazyWeakLock(1024);
233   private final Striped<Lock> mediaPackageLocks = Striped.lazyWeakLock(1024);
234 
235   /** The Elasticsearch indices */
236   private ElasticsearchIndex index;
237 
238   /**
239    * Constructs a new workflow service impl, with a priority-sorted map of metadata services
240    */
241   public WorkflowServiceImpl() {
242     metadataServices = new TreeSet<>(Comparator.comparingInt(MetadataService::getPriority));
243   }
244 
245   /**
246    * Activate this service implementation via the OSGI service component runtime.
247    *
248    * @param componentContext
249    *          the component context
250    */
251   @Activate
252   public void activate(ComponentContext componentContext) {
253     this.componentContext = componentContext;
254     executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();
255     logger.info("Activate Workflow service");
256   }
257 
258   /**
259    * {@inheritDoc}
260    *
261    * @see org.opencastproject.workflow.api.WorkflowService#addWorkflowListener(
262    *      org.opencastproject.workflow.api.WorkflowListener)
263    */
264   @Override
265   public void addWorkflowListener(WorkflowListener listener) {
266     listeners.add(listener);
267   }
268 
269   /**
270    * {@inheritDoc}
271    *
272    * @see org.opencastproject.workflow.api.WorkflowService#removeWorkflowListener(
273    *      org.opencastproject.workflow.api.WorkflowListener)
274    */
275   @Override
276   public void removeWorkflowListener(WorkflowListener listener) {
277     listeners.remove(listener);
278   }
279 
280   /**
281    * Fires the workflow listeners on workflow updates.
282    */
283   protected void fireListeners(final WorkflowInstance oldWorkflowInstance, final WorkflowInstance newWorkflowInstance) {
284     final User currentUser = securityService.getUser();
285     final Organization currentOrganization = securityService.getOrganization();
286     for (final WorkflowListener listener : listeners) {
287       if (oldWorkflowInstance == null || !oldWorkflowInstance.getState().equals(newWorkflowInstance.getState())) {
288         Runnable runnable = () -> {
289           try {
290             securityService.setUser(currentUser);
291             securityService.setOrganization(currentOrganization);
292             listener.stateChanged(newWorkflowInstance);
293           } finally {
294             securityService.setUser(null);
295             securityService.setOrganization(null);
296           }
297         };
298         executorService.execute(runnable);
299       } else {
300         logger.debug("Not notifying {} because the workflow state has not changed", listener);
301       }
302 
303       if (newWorkflowInstance.getCurrentOperation() != null) {
304         if (oldWorkflowInstance == null || oldWorkflowInstance.getCurrentOperation() == null
305                 || !oldWorkflowInstance.getCurrentOperation().equals(newWorkflowInstance.getCurrentOperation())) {
306           Runnable runnable = () -> {
307             try {
308               securityService.setUser(currentUser);
309               securityService.setOrganization(currentOrganization);
310               listener.operationChanged(newWorkflowInstance);
311             } finally {
312               securityService.setUser(null);
313               securityService.setOrganization(null);
314             }
315           };
316           executorService.execute(runnable);
317         }
318       } else {
319         logger.debug("Not notifying {} because the workflow operation has not changed", listener);
320       }
321     }
322   }
323 
324   /**
325    * {@inheritDoc}
326    *
327    * @see org.opencastproject.workflow.api.WorkflowService#listAvailableWorkflowDefinitions()
328    */
329   @Override
330   public List<WorkflowDefinition> listAvailableWorkflowDefinitions() {
331     return workflowDefinitionScanner
332             .getAvailableWorkflowDefinitions(securityService.getOrganization(), securityService.getUser())
333             .sorted()
334             .collect(Collectors.toList());
335   }
336 
337   /**
338    * {@inheritDoc}
339    */
340   public boolean isRunnable(WorkflowDefinition workflowDefinition) {
341     List<String> availableOperations = listAvailableOperationNames();
342     List<WorkflowDefinition> checkedWorkflows = new ArrayList<>();
343     boolean runnable = isRunnable(workflowDefinition, availableOperations, checkedWorkflows);
344     int wfCount = checkedWorkflows.size() - 1;
345     if (runnable) {
346       logger.info("Workflow {}, containing {} derived workflows, is runnable", workflowDefinition, wfCount);
347     } else {
348       logger.warn("Workflow {}, containing {} derived workflows, is not runnable", workflowDefinition, wfCount);
349     }
350     return runnable;
351   }
352 
353   /**
354    * Tests the workflow definition for its runnability. This method is a helper for
355    * {@link #isRunnable(WorkflowDefinition)} that is suited for recursive calling.
356    *
357    * @param workflowDefinition
358    *          the definition to test
359    * @param availableOperations
360    *          list of currently available operation handlers
361    * @param checkedWorkflows
362    *          list of checked workflows, used to avoid circular checking
363    * @return <code>true</code> if all bits and pieces used for executing <code>workflowDefinition</code> are in place
364    */
365   private boolean isRunnable(WorkflowDefinition workflowDefinition, List<String> availableOperations,
366           List<WorkflowDefinition> checkedWorkflows) {
367     if (checkedWorkflows.contains(workflowDefinition)) {
368       return true;
369     }
370 
371     // Test availability of operation handler and catch workflows
372     for (WorkflowOperationDefinition op : workflowDefinition.getOperations()) {
373       if (!availableOperations.contains(op.getId())) {
374         logger.info("{} is not runnable due to missing operation {}", workflowDefinition, op);
375         return false;
376       }
377       String catchWorkflow = op.getExceptionHandlingWorkflow();
378       if (catchWorkflow != null) {
379         WorkflowDefinition catchWorkflowDefinition;
380         try {
381           catchWorkflowDefinition = getWorkflowDefinitionById(catchWorkflow);
382         } catch (NotFoundException e) {
383           logger.info("{} is not runnable due to missing catch workflow {} on operation {}", workflowDefinition,
384                   catchWorkflow, op);
385           return false;
386         }
387         if (!isRunnable(catchWorkflowDefinition, availableOperations, checkedWorkflows)) {
388           return false;
389         }
390       }
391     }
392 
393     // Add the workflow to the list of checked workflows
394     if (!checkedWorkflows.contains(workflowDefinition)) {
395       checkedWorkflows.add(workflowDefinition);
396     }
397     return true;
398   }
399 
400   /**
401    * Gets the currently registered workflow operation handlers.
402    *
403    * @return All currently registered handlers
404    */
405   public Set<HandlerRegistration> getRegisteredHandlers() {
406     Set<HandlerRegistration> set = new HashSet<>();
407     ServiceReference[] refs;
408     try {
409       refs = componentContext.getBundleContext().getServiceReferences(WorkflowOperationHandler.class.getName(), null);
410     } catch (InvalidSyntaxException e) {
411       throw new IllegalStateException(e);
412     }
413     if (refs != null) {
414       for (ServiceReference ref : refs) {
415         WorkflowOperationHandler handler = (WorkflowOperationHandler) componentContext.getBundleContext().getService(
416                 ref);
417         set.add(new HandlerRegistration((String) ref.getProperty(WORKFLOW_OPERATION_PROPERTY), handler));
418       }
419     } else {
420       logger.warn("No registered workflow operation handlers found");
421     }
422     return set;
423   }
424 
425   protected WorkflowOperationHandler getWorkflowOperationHandler(String operationId) {
426     for (HandlerRegistration reg : getRegisteredHandlers()) {
427       if (reg.operationName.equals(operationId)) {
428         return reg.handler;
429       }
430     }
431     return null;
432   }
433 
434   /**
435    * Lists the names of each workflow operation. Operation names are availalbe for use if there is a registered
436    * {@link WorkflowOperationHandler} with an equal {@link WorkflowServiceImpl#WORKFLOW_OPERATION_PROPERTY} property.
437    *
438    * @return The {@link List} of available workflow operation names
439    */
440   protected List<String> listAvailableOperationNames() {
441     return getRegisteredHandlers().parallelStream().map(op -> op.operationName).collect(Collectors.toList());
442   }
443 
444   /**
445    * {@inheritDoc}
446    *
447    * @see org.opencastproject.workflow.api.WorkflowService#getWorkflowById(long)
448    */
449   @Override
450   public WorkflowInstance getWorkflowById(long id) throws NotFoundException,
451           UnauthorizedException {
452     try {
453       WorkflowInstance workflow = persistence.getWorkflow(id);
454       assertPermission(workflow, Permissions.Action.READ.toString(), workflow.getOrganizationId());
455       return workflow;
456     } catch (WorkflowDatabaseException e) {
457       throw new IllegalStateException("Got not get workflow from database with id ");
458     }
459   }
460 
461   /**
462    * {@inheritDoc}
463    *
464    * @see org.opencastproject.workflow.api.WorkflowService#start(org.opencastproject.workflow.api.WorkflowDefinition,
465    *      org.opencastproject.mediapackage.MediaPackage)
466    */
467   @Override
468   public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage)
469           throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
470     return start(workflowDefinition, mediaPackage, new HashMap<>());
471   }
472 
473   /**
474    * {@inheritDoc}
475    *
476    * @see org.opencastproject.workflow.api.WorkflowService#start(org.opencastproject.workflow.api.WorkflowDefinition,
477    *      org.opencastproject.mediapackage.MediaPackage)
478    */
479   @Override
480   public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage,
481           Map<String, String> properties)
482           throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
483     try {
484       return start(workflowDefinition, mediaPackage, null, properties);
485     } catch (NotFoundException e) {
486       // should never happen
487       throw new IllegalStateException("a null workflow ID caused a NotFoundException.  This is a programming error.");
488     }
489   }
490 
491   /**
492    * {@inheritDoc}
493    *
494    * @see org.opencastproject.workflow.api.WorkflowService#start(org.opencastproject.workflow.api.WorkflowDefinition,
495    *      org.opencastproject.mediapackage.MediaPackage, Long, java.util.Map)
496    */
497   @Override
498   public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage sourceMediaPackage,
499           Long parentWorkflowId, Map<String, String> originalProperties) throws WorkflowDatabaseException,
500           NotFoundException, UnauthorizedException, WorkflowParsingException, IllegalStateException {
501     final String mediaPackageId = sourceMediaPackage.getIdentifier().toString();
502     Map<String, String> properties = null;
503 
504     // WorkflowPropertiesUtil.storeProperties will take a snapshot if there isn't one
505     // and we want the mp in the snapshot to have all the metadata populated.
506     populateMediaPackageMetadata(sourceMediaPackage);
507 
508     if (originalProperties != null) {
509       WorkflowPropertiesUtil.storeProperties(assetManager, sourceMediaPackage, originalProperties);
510       properties = WorkflowPropertiesUtil.getLatestWorkflowProperties(assetManager, mediaPackageId);
511     }
512 
513     // We have to synchronize per media package to avoid starting multiple simultaneous workflows for one media package.
514     final Lock lock = mediaPackageLocks.get(mediaPackageId);
515     lock.lock();
516 
517     try {
518       if (workflowDefinition == null) {
519         throw new IllegalArgumentException("workflow definition must not be null");
520       }
521       for (List<String> errors : MediaPackageSupport.sanityCheck(sourceMediaPackage)) {
522         throw new IllegalArgumentException("Insane media package cannot be processed: " + String.join("; ", errors));
523       }
524       if (parentWorkflowId != null) {
525         try {
526           getWorkflowById(parentWorkflowId); // Let NotFoundException bubble up
527         } catch (UnauthorizedException e) {
528           throw new IllegalArgumentException("Parent workflow " + parentWorkflowId + " not visible to this user");
529         }
530       } else {
531         if (persistence.mediaPackageHasActiveWorkflows(mediaPackageId)) {
532           throw new IllegalStateException(String.format(
533                   "Can't start workflow '%s' for media package '%s' because another workflow is currently active.",
534                   workflowDefinition.getTitle(),
535                   sourceMediaPackage.getIdentifier().toString()));
536         }
537       }
538 
539       // Get the current user
540       User currentUser = securityService.getUser();
541       validUserOrThrow(currentUser);
542 
543       // Get the current organization
544       Organization organization = securityService.getOrganization();
545       if (organization == null) {
546         throw new SecurityException("Current organization is unknown");
547       }
548 
549       WorkflowInstance workflowInstance = new WorkflowInstance(workflowDefinition, sourceMediaPackage,
550               currentUser, organization, properties);
551       workflowInstance = updateConfiguration(workflowInstance, properties);
552 
553       // Create and configure the workflow instance
554       try {
555         // Create a new job for this workflow instance
556         String workflowDefinitionXml = XmlWorkflowParser.toXml(workflowDefinition);
557         String mediaPackageXml = MediaPackageParser.getAsXml(sourceMediaPackage);
558 
559         List<String> arguments = new ArrayList<>();
560         arguments.add(workflowDefinitionXml);
561         arguments.add(mediaPackageXml);
562         if (parentWorkflowId != null || properties != null) {
563           String parentWorkflowIdString = (parentWorkflowId != null) ? parentWorkflowId.toString() : NULL_PARENT_ID;
564           arguments.add(parentWorkflowIdString);
565         }
566         if (properties != null) {
567           arguments.add(mapToString(properties));
568         }
569 
570         Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_WORKFLOW.toString(), arguments,
571                 null, false, null, WORKFLOW_JOB_LOAD);
572 
573         // Have the workflow take on the job's identity
574         workflowInstance.setId(job.getId());
575 
576         // Add the workflow to the search index and have the job enqueued for dispatch.
577         // Update also sets ACL and mediapackage metadata
578         update(workflowInstance);
579 
580         return workflowInstance;
581       } catch (Throwable t) {
582         try {
583           workflowInstance.setState(FAILED);
584           update(workflowInstance);
585         } catch (Exception failureToFail) {
586           logger.warn("Unable to update workflow to failed state", failureToFail);
587         }
588         try {
589           throw t;
590         } catch (ServiceRegistryException e) {
591           throw new WorkflowDatabaseException(e);
592         }
593       }
594     } finally {
595       lock.unlock();
596     }
597   }
598 
599   protected WorkflowInstance updateConfiguration(WorkflowInstance instance, Map<String, String> properties) {
600     if (properties != null) {
601       for (Entry<String, String> entry : properties.entrySet()) {
602         instance.setConfiguration(entry.getKey(), entry.getValue());
603       }
604     }
605     return instance;
606   }
607 
608   /**
609    * Does a lookup of available operation handlers for the given workflow operation.
610    *
611    * @param operation
612    *          the operation definition
613    * @return the handler or <code>null</code>
614    */
615   protected WorkflowOperationHandler selectOperationHandler(WorkflowOperationInstance operation) {
616     List<WorkflowOperationHandler> handlerList = new ArrayList<>();
617     for (HandlerRegistration handlerReg : getRegisteredHandlers()) {
618       if (handlerReg.operationName != null && handlerReg.operationName.equals(operation.getTemplate())) {
619         handlerList.add(handlerReg.handler);
620       }
621     }
622     if (handlerList.size() > 1) {
623       throw new IllegalStateException("Multiple operation handlers found for operation '" + operation.getTemplate()
624               + "'");
625     } else if (handlerList.size() == 1) {
626       return handlerList.get(0);
627     }
628     logger.warn("No workflow operation handlers found for operation '{}'", operation.getTemplate());
629     return null;
630   }
631 
632   /**
633    * Executes the workflow.
634    *
635    * @param workflow
636    *          the workflow instance
637    * @throws WorkflowException
638    *           if there is a problem processing the workflow
639    * @throws UnauthorizedException
640    */
641   protected Job runWorkflow(WorkflowInstance workflow) throws WorkflowException, UnauthorizedException {
642     if (INSTANTIATED != workflow.getState()) {
643 
644       // If the workflow is "running", we need to determine if there is an operation being executed or not.
645       // When a workflow has been restarted, this might not be the case and the status might not have been
646       // updated accordingly.
647       if (RUNNING == workflow.getState()) {
648         WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
649         if (currentOperation != null) {
650           if (currentOperation.getId() != null) {
651             try {
652               Job operationJob = serviceRegistry.getJob(currentOperation.getId());
653               if (Job.Status.RUNNING.equals(operationJob.getStatus())) {
654                 logger.debug("Not starting workflow {}, it is already in running state", workflow);
655                 return null;
656               } else {
657                 logger.info("Scheduling next operation of workflow {}", workflow);
658                 operationJob.setStatus(Status.QUEUED);
659                 operationJob.setDispatchable(true);
660                 return serviceRegistry.updateJob(operationJob);
661               }
662             } catch (Exception e) {
663               logger.warn("Error determining status of current workflow operation in {}", workflow, e);
664               return null;
665             }
666           }
667         } else {
668           throw new IllegalStateException("Cannot start a workflow '" + workflow + "' with no current operation");
669         }
670       } else {
671         throw new IllegalStateException("Cannot start a workflow in state '" + workflow.getState() + "'");
672       }
673     }
674 
675     // If this is a new workflow, move to the first operation
676     workflow.setState(RUNNING);
677     update(workflow);
678 
679     WorkflowOperationInstance operation = workflow.getCurrentOperation();
680 
681     if (operation == null) {
682       throw new IllegalStateException("Cannot start a workflow without a current operation");
683     }
684 
685     if (!operation.equals(workflow.getOperations().get(0))) {
686       throw new IllegalStateException("Current operation expected to be first");
687     }
688 
689     try {
690       logger.info("Scheduling workflow {} for execution", workflow.getId());
691       Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
692               Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
693       operation.setId(job.getId());
694       update(workflow);
695       job.setStatus(Status.QUEUED);
696       job.setDispatchable(true);
697       return serviceRegistry.updateJob(job);
698     } catch (ServiceRegistryException e) {
699       throw new WorkflowDatabaseException(e);
700     } catch (NotFoundException e) {
701       // this should be impossible
702       throw new IllegalStateException("Unable to find a job that was just created");
703     }
704 
705   }
706 
707   /**
708    * Executes the workflow's current operation.
709    *
710    * @param workflow
711    *          the workflow
712    * @param properties
713    *          the properties that are passed in on resume
714    * @return the processed workflow operation
715    * @throws WorkflowException
716    *           if there is a problem processing the workflow
717    * @throws UnauthorizedException
718    */
719   protected WorkflowOperationInstance runWorkflowOperation(WorkflowInstance workflow, Map<String, String> properties)
720           throws WorkflowException, UnauthorizedException {
721     WorkflowOperationInstance processingOperation = workflow.getCurrentOperation();
722     if (processingOperation == null) {
723       throw new IllegalStateException("Workflow '" + workflow + "' has no operation to run");
724     }
725 
726     // Keep the current state for later reference, it might have been changed from the outside
727     WorkflowState initialState = workflow.getState();
728 
729     // Execute the operation handler
730     WorkflowOperationHandler operationHandler = selectOperationHandler(processingOperation);
731     WorkflowOperationWorker worker = new WorkflowOperationWorker(operationHandler, workflow, properties, this);
732     workflow = worker.execute();
733 
734     Long currentOperationJobId = processingOperation.getId();
735     try {
736       updateOperationJob(currentOperationJobId, processingOperation.getState());
737     } catch (NotFoundException e) {
738       throw new IllegalStateException("Unable to find a job that has already been running");
739     } catch (ServiceRegistryException e) {
740       throw new WorkflowDatabaseException(e);
741     }
742 
743     // Move on to the next workflow operation
744     WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
745 
746     // Is the workflow done?
747     if (currentOperation == null) {
748 
749       // If we are in failing mode, we were simply working off an error handling workflow
750       if (FAILING.equals(workflow.getState())) {
751         workflow.setState(FAILED);
752       }
753 
754       // Otherwise, let's make sure we didn't miss any failed operation, since the workflow state could have been
755       // switched to paused while processing the error handling workflow extension
756       else if (!FAILED.equals(workflow.getState())) {
757         workflow.setState(SUCCEEDED);
758         for (WorkflowOperationInstance op : workflow.getOperations()) {
759           if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
760             if (op.isFailOnError()) {
761               workflow.setState(FAILED);
762               break;
763             }
764           }
765         }
766       }
767 
768       // Save the updated workflow to the database
769       logger.debug("{} has {}", workflow, workflow.getState());
770       update(workflow);
771 
772     } else {
773 
774       // Somebody might have set the workflow to "paused" from the outside, so take a look a the database first
775       WorkflowState dbWorkflowState;
776       try {
777         dbWorkflowState = getWorkflowById(workflow.getId()).getState();
778       } catch (NotFoundException e) {
779         throw new IllegalStateException("The workflow with ID " + workflow.getId()
780                 + " can not be found in the database", e);
781       } catch (UnauthorizedException e) {
782         throw new IllegalStateException("The workflow with ID " + workflow.getId() + " can not be read", e);
783       }
784 
785       // If somebody changed the workflow state from the outside, that state should take precedence
786       if (!dbWorkflowState.equals(initialState)) {
787         logger.info("Workflow state for {} was changed to '{}' from the outside", workflow, dbWorkflowState);
788         workflow.setState(dbWorkflowState);
789       }
790 
791       // Save the updated workflow to the database
792 
793       Job job;
794       switch (workflow.getState()) {
795         case FAILED:
796           update(workflow);
797           break;
798         case FAILING:
799         case RUNNING:
800           try {
801             job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
802                     Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
803             currentOperation.setId(job.getId());
804             update(workflow);
805             job.setStatus(Status.QUEUED);
806             job.setDispatchable(true);
807             serviceRegistry.updateJob(job);
808           } catch (ServiceRegistryException e) {
809             throw new WorkflowDatabaseException(e);
810           } catch (NotFoundException e) {
811             // this should be impossible
812             throw new IllegalStateException("Unable to find a job that was just created");
813           }
814           break;
815         case PAUSED:
816         case STOPPED:
817         case SUCCEEDED:
818           update(workflow);
819           break;
820         case INSTANTIATED:
821           update(workflow);
822           throw new IllegalStateException("Impossible workflow state found during processing");
823         default:
824           throw new IllegalStateException("Unknown workflow state found during processing");
825       }
826 
827     }
828 
829     return processingOperation;
830   }
831 
832   /**
833    * {@inheritDoc}
834    *
835    * @see org.opencastproject.workflow.api.WorkflowService#getWorkflowDefinitionById(String)
836    */
837   @Override
838   public WorkflowDefinition getWorkflowDefinitionById(String id) throws NotFoundException {
839     final WorkflowIdentifier workflowIdentifier = new WorkflowIdentifier(id, securityService.getOrganization().getId());
840     final WorkflowDefinition def = workflowDefinitionScanner
841             .getWorkflowDefinition(securityService.getUser(), workflowIdentifier);
842     if (def == null) {
843       throw new NotFoundException("Workflow definition '" + workflowIdentifier + "' not found or inaccessible");
844     }
845     return def;
846   }
847 
848   /**
849    * {@inheritDoc}
850    *
851    * @see org.opencastproject.workflow.api.WorkflowService#stop(long)
852    */
853   @Override
854   public WorkflowInstance stop(long workflowInstanceId) throws WorkflowException, NotFoundException,
855           UnauthorizedException {
856     final Lock lock = this.lock.get(workflowInstanceId);
857     lock.lock();
858     try {
859       WorkflowInstance instance = getWorkflowById(workflowInstanceId);
860 
861       if (instance.getState() != STOPPED) {
862         // Update the workflow instance
863         instance.setState(STOPPED);
864         update(instance);
865       }
866 
867       try {
868         removeTempFiles(instance);
869       } catch (Exception e) {
870         logger.warn("Cannot remove temp files for workflow instance {}", workflowInstanceId, e);
871       }
872 
873       return instance;
874     } finally {
875       lock.unlock();
876     }
877   }
878 
879   /**
880    * Checks whether user is set and is known to the userDirectoryService
881    */
882   private void validUserOrThrow(User user) {
883     if (user == null) {
884       throw new SecurityException("Current user is unknown");
885     }
886 
887     if (userDirectoryService.loadUser(user.getUsername()) == null) {
888       throw new SecurityException(String.format("Current user '%s' can not be loaded", user.getUsername()));
889     }
890   }
891 
892   private void removeTempFiles(WorkflowInstance workflowInstance) {
893     logger.info("Removing temporary files for workflow {}", workflowInstance.getId());
894     MediaPackage mp = workflowInstance.getMediaPackage();
895     if (null == mp) {
896       logger.warn("Workflow instance {} does not have an media package set", workflowInstance.getId());
897       return;
898     }
899     for (MediaPackageElement elem : mp.getElements()) {
900       // Publications should not link to temporary files and can be skipped
901       if (elem instanceof Publication) {
902         continue;
903       }
904       if (null == elem.getURI()) {
905         logger.warn("Media package element {} from the media package {} does not have an URI set",
906                 elem.getIdentifier(), mp.getIdentifier());
907         continue;
908       }
909       try {
910         logger.debug("Removing temporary file {} for workflow {}", elem.getURI(), workflowInstance);
911         workspace.delete(elem.getURI());
912       } catch (IOException e) {
913         logger.warn("Unable to delete mediapackage element", e);
914       } catch (NotFoundException e) {
915         // File was probably already deleted before...
916       }
917     }
918   }
919 
920 
921   /**
922    * {@inheritDoc}
923    *
924    * @see org.opencastproject.workflow.api.WorkflowService#remove(long)
925    */
926   @Override
927   public void remove(long workflowInstanceId) throws WorkflowDatabaseException, NotFoundException,
928           UnauthorizedException, WorkflowParsingException, WorkflowStateException {
929     remove(workflowInstanceId, false);
930   }
931 
932   /**
933    * {@inheritDoc}
934    *
935    * @see org.opencastproject.workflow.api.WorkflowService#remove(long,boolean)
936    */
937   @Override
938   public void remove(long workflowInstanceId, boolean force) throws WorkflowDatabaseException, NotFoundException,
939           UnauthorizedException, WorkflowStateException {
940     final Lock lock = this.lock.get(workflowInstanceId);
941     lock.lock();
942     try {
943       WorkflowInstance instance = getWorkflowById(workflowInstanceId);
944       WorkflowInstance.WorkflowState state = instance.getState();
945       if (!state.isTerminated() && !force) {
946         throw new WorkflowStateException("Workflow instance with state '" + state + "' cannot be removed "
947             + "since it is not yet terminated.");
948       }
949 
950       assertPermission(instance, Permissions.Action.WRITE.toString(), instance.getOrganizationId());
951 
952       // First, remove temporary files
953       removeTempFiles(instance);
954 
955       // Second, remove jobs related to operations which belong to the workflow instance
956       List<WorkflowOperationInstance> operations = instance.getOperations();
957       List<Long> jobsToDelete = new ArrayList<>();
958       for (WorkflowOperationInstance op : operations) {
959         if (op.getId() != null) {
960           long workflowOpId = op.getId();
961           if (workflowOpId != workflowInstanceId) {
962             jobsToDelete.add(workflowOpId);
963           }
964         }
965       }
966       try {
967         serviceRegistry.removeJobs(jobsToDelete);
968       } catch (ServiceRegistryException e) {
969         logger.warn("Problems while removing jobs related to workflow operations '{}'", jobsToDelete, e);
970       } catch (NotFoundException e) {
971         logger.debug("No jobs related to one of the workflow operation '{}' found in service registry", jobsToDelete);
972       }
973 
974       // Third, remove workflow instance job itself
975       try {
976         serviceRegistry.removeJobs(Collections.singletonList(workflowInstanceId));
977         removeWorkflowInstanceFromIndex(instance.getId());
978       } catch (ServiceRegistryException e) {
979         logger.warn("Problems while removing workflow instance job '{}'", workflowInstanceId, e);
980       } catch (NotFoundException e) {
981         logger.info("No workflow instance job '{}' found in the service registry", workflowInstanceId);
982       }
983 
984       // Remove workflow from database
985       persistence.removeFromDatabase(instance);
986     } finally {
987       lock.unlock();
988     }
989   }
990 
991   /**
992    * {@inheritDoc}
993    *
994    * @see org.opencastproject.workflow.api.WorkflowService#suspend(long)
995    */
996   @Override
997   public WorkflowInstance suspend(long workflowInstanceId) throws WorkflowException, NotFoundException,
998           UnauthorizedException {
999     final Lock lock = this.lock.get(workflowInstanceId);
1000     lock.lock();
1001     try {
1002       WorkflowInstance instance = getWorkflowById(workflowInstanceId);
1003       instance.setState(PAUSED);
1004       update(instance);
1005       return instance;
1006     } finally {
1007       lock.unlock();
1008     }
1009   }
1010 
1011   /**
1012    * {@inheritDoc}
1013    *
1014    * @see org.opencastproject.workflow.api.WorkflowService#resume(long)
1015    */
1016   @Override
1017   public WorkflowInstance resume(long id) throws WorkflowException, NotFoundException, IllegalStateException,
1018           UnauthorizedException {
1019     return resume(id, null);
1020   }
1021 
1022   /**
1023    * {@inheritDoc}
1024    *
1025    * @see org.opencastproject.workflow.api.WorkflowService#resume(long, Map)
1026    */
1027   @Override
1028   public WorkflowInstance resume(long workflowInstanceId, Map<String, String> properties) throws WorkflowException,
1029           NotFoundException, IllegalStateException, UnauthorizedException {
1030     WorkflowInstance workflowInstance = getWorkflowById(workflowInstanceId);
1031     if (!WorkflowState.PAUSED.equals(workflowInstance.getState())) {
1032       throw new IllegalStateException("Can not resume a workflow where the current state is not in paused");
1033     }
1034 
1035     workflowInstance = updateConfiguration(workflowInstance, properties);
1036     update(workflowInstance);
1037 
1038     WorkflowOperationInstance currentOperation = workflowInstance.getCurrentOperation();
1039 
1040     // Is the workflow done?
1041     if (currentOperation == null) {
1042       // Let's make sure we didn't miss any failed operation, since the workflow state could have been
1043       // switched to paused while processing the error handling workflow extension
1044       workflowInstance.setState(SUCCEEDED);
1045       for (WorkflowOperationInstance op : workflowInstance.getOperations()) {
1046         if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
1047           if (op.isFailOnError()) {
1048             workflowInstance.setState(FAILED);
1049             break;
1050           }
1051         }
1052       }
1053 
1054       // Save the resumed workflow to the database
1055       logger.debug("{} has {}", workflowInstance, workflowInstance.getState());
1056       update(workflowInstance);
1057       return workflowInstance;
1058     }
1059 
1060     // We can resume workflows when they are in either the paused state, or they are being advanced manually passed
1061     // certain operations. In the latter case, there is no current paused operation.
1062     if (OperationState.INSTANTIATED.equals(currentOperation.getState())) {
1063       try {
1064         // the operation has its own job. Update that too.
1065         Job operationJob = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
1066                 Collections.singletonList(Long.toString(workflowInstanceId)), null, false, null, WORKFLOW_JOB_LOAD);
1067 
1068         // this method call is publicly visible, so it doesn't necessarily go through the accept method. Set the
1069         // workflow state manually.
1070         workflowInstance.setState(RUNNING);
1071         currentOperation.setId(operationJob.getId());
1072 
1073         // update the workflow and its associated job
1074         update(workflowInstance);
1075 
1076         // Now set this job to be queued so it can be dispatched
1077         operationJob.setStatus(Status.QUEUED);
1078         operationJob.setDispatchable(true);
1079         serviceRegistry.updateJob(operationJob);
1080 
1081         return workflowInstance;
1082       } catch (ServiceRegistryException e) {
1083         throw new WorkflowDatabaseException(e);
1084       }
1085     }
1086 
1087     Long operationJobId = workflowInstance.getCurrentOperation().getId();
1088     if (operationJobId == null) {
1089       throw new IllegalStateException("Can not resume a workflow where the current operation has no associated id");
1090     }
1091 
1092     // Set the current operation's job to queued, so it gets picked up again
1093     Job workflowJob;
1094     try {
1095       workflowJob = serviceRegistry.getJob(workflowInstanceId);
1096       workflowJob.setStatus(Status.RUNNING);
1097       persistence.updateInDatabase(workflowInstance);
1098       serviceRegistry.updateJob(workflowJob);
1099 
1100       Job operationJob = serviceRegistry.getJob(operationJobId);
1101       operationJob.setStatus(Status.QUEUED);
1102       operationJob.setDispatchable(true);
1103       if (properties != null) {
1104         Properties props = new Properties();
1105         props.putAll(properties);
1106         ByteArrayOutputStream out = new ByteArrayOutputStream();
1107         props.store(out, null);
1108         List<String> newArguments = new ArrayList<String>(operationJob.getArguments());
1109         newArguments.add(new String(out.toByteArray(), StandardCharsets.UTF_8));
1110         operationJob.setArguments(newArguments);
1111       }
1112       serviceRegistry.updateJob(operationJob);
1113     } catch (ServiceRegistryException e) {
1114       throw new WorkflowDatabaseException(e);
1115     } catch (IOException e) {
1116       throw new WorkflowParsingException("Unable to parse workflow and/or workflow properties");
1117     }
1118 
1119     return workflowInstance;
1120   }
1121 
1122   /**
1123    * Asserts that the current user has permission to take the provided action on a workflow instance.
1124    *
1125    * @param workflow
1126    *          the workflow instance
1127    * @param action
1128    *          the action to ensure is permitted
1129    * @throws UnauthorizedException
1130    *           if the action is not authorized
1131    */
1132   protected void assertPermission(WorkflowInstance workflow, String action, String workflowOrgId)
1133           throws UnauthorizedException {
1134     User currentUser = securityService.getUser();
1135     Organization currentOrg = securityService.getOrganization();
1136     String currentOrgAdminRole = currentOrg.getAdminRole();
1137     String currentOrgId = currentOrg.getId();
1138 
1139     MediaPackage mediapackage = workflow.getMediaPackage();
1140 
1141     WorkflowState state = workflow.getState();
1142     if (state != INSTANTIATED && state != RUNNING && workflow.getState() != FAILING) {
1143       Optional<MediaPackage> assetMediapackage = assetManager.getMediaPackage(mediapackage.getIdentifier().toString());
1144       if (assetMediapackage.isPresent()) {
1145         mediapackage = assetMediapackage.get();
1146       }
1147     }
1148 
1149     var creatorName = workflow.getCreatorName();
1150     var workflowCreator = creatorName == null ? null : userDirectoryService.loadUser(creatorName);
1151     boolean authorized = currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1152             || (currentUser.hasRole(currentOrgAdminRole) && currentOrgId.equals(workflowOrgId))
1153             || (currentUser.equals(workflowCreator))
1154             || (authorizationService.hasPermission(mediapackage, action) && currentOrgId.equals(workflowOrgId));
1155 
1156     if (!authorized) {
1157       throw new UnauthorizedException(currentUser, action);
1158     }
1159   }
1160 
1161   protected boolean assertMediaPackagePermission(String mediaPackageId, String action) {
1162     var currentUser = securityService.getUser();
1163     Optional<MediaPackage> mp = assetManager.getMediaPackage(mediaPackageId);
1164 
1165     // asset manager already checks if user is admin, org admin for same org as mp, or has explicit read rights
1166     // global admins can still get workflow instances if mp is gone from asset manager
1167     // org admins can't because then we don't know if mp belonged to same org as user
1168     return currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1169             || mp.isPresent() && currentUser.hasRole(securityService.getOrganization().getAdminRole())
1170             || mp.isPresent() && authorizationService.hasPermission(mp.get(), action);
1171   }
1172 
1173   /**
1174    * {@inheritDoc}
1175    *
1176    * @see org.opencastproject.workflow.api.WorkflowService#update(org.opencastproject.workflow.api.WorkflowInstance)
1177    */
1178   @Override
1179   public void update(final WorkflowInstance workflowInstance) throws WorkflowDatabaseException, UnauthorizedException {
1180     final Lock lock = updateLock.get(workflowInstance.getId());
1181     lock.lock();
1182 
1183     try {
1184       WorkflowInstance originalWorkflowInstance = null;
1185       try {
1186         // get workflow and assert permissions
1187         originalWorkflowInstance = getWorkflowById(workflowInstance.getId());
1188       } catch (NotFoundException e) {
1189         // That's fine, it's a new workflow instance
1190       }
1191 
1192       MediaPackage updatedMediaPackage = null;
1193       try {
1194 
1195         // Before we persist this, extract the metadata
1196         updatedMediaPackage = workflowInstance.getMediaPackage();
1197 
1198         populateMediaPackageMetadata(updatedMediaPackage);
1199 
1200         String seriesId = updatedMediaPackage.getSeries();
1201         if (seriesId != null && workflowInstance.getCurrentOperation() != null) {
1202           // If the mediapackage contains a series, find the series ACLs and add the security information to the
1203           // mediapackage
1204 
1205           try {
1206             AccessControlList acl = seriesService.getSeriesAccessControl(seriesId);
1207             Tuple<AccessControlList, AclScope> activeAcl = authorizationService.getAcl(
1208                 updatedMediaPackage, AclScope.Series);
1209             // Update series ACL if it differs from the active series ACL on the media package
1210             if (!AclScope.Series.equals(activeAcl.getB()) || !AccessControlUtil.equals(activeAcl.getA(), acl)) {
1211               authorizationService.setAcl(updatedMediaPackage, AclScope.Series, acl);
1212             }
1213           } catch (NotFoundException e) {
1214             logger.debug("Not updating series ACL on event {} since series {} has no ACL set",
1215                 updatedMediaPackage, seriesId, e);
1216           }
1217         }
1218 
1219         workflowInstance.setMediaPackage(updatedMediaPackage);
1220       } catch (SeriesException e) {
1221         throw new WorkflowDatabaseException(e);
1222       } catch (Exception e) {
1223         logger.error("Metadata for media package {} could not be updated", updatedMediaPackage, e);
1224       }
1225 
1226       // Synchronize the job status with the workflow
1227       WorkflowState workflowState = workflowInstance.getState();
1228 
1229       Job job;
1230       try {
1231         job = serviceRegistry.getJob(workflowInstance.getId());
1232         job.setPayload(Long.toString(workflowInstance.getId()));
1233 
1234         // Synchronize workflow and job state
1235         switch (workflowState) {
1236           case FAILED:
1237             job.setStatus(Status.FAILED);
1238             break;
1239           case FAILING:
1240             break;
1241           case INSTANTIATED:
1242             job.setDispatchable(true);
1243             job.setStatus(Status.QUEUED);
1244             break;
1245           case PAUSED:
1246             job.setStatus(Status.PAUSED);
1247             break;
1248           case RUNNING:
1249             job.setStatus(Status.RUNNING);
1250             break;
1251           case STOPPED:
1252             job.setStatus(Status.CANCELLED);
1253             break;
1254           case SUCCEEDED:
1255             job.setStatus(Status.FINISHED);
1256             break;
1257           default:
1258             throw new IllegalStateException("Found a workflow state that is not handled");
1259         }
1260       } catch (ServiceRegistryException e) {
1261         throw new WorkflowDatabaseException(
1262             "Unable to read workflow job " + workflowInstance.getId() + " from service registry", e);
1263       } catch (NotFoundException e) {
1264         throw new WorkflowDatabaseException(
1265             "Job for workflow " + workflowInstance.getId() + " not found in service registry", e);
1266       }
1267 
1268       // Update both workflow and workflow job
1269       try {
1270         //Update the database
1271         persistence.updateInDatabase(workflowInstance);
1272 
1273         job = serviceRegistry.updateJob(job);
1274 
1275         WorkflowOperationInstance op = workflowInstance.getCurrentOperation();
1276 
1277         // Update index used for UI. Note that we only need certain metadata and we can safely filter out workflow
1278         // updates for running operations since we updated the metadata right before these operations and will do so
1279         // again right after those operations.
1280         if (op == null || op.getState() != OperationState.RUNNING) {
1281           // Collect necessary information only for index update
1282           long id = workflowInstance.getId();
1283           int state = workflowInstance.getState().ordinal();
1284           var template = workflowInstance.getTemplate();
1285           String mpId = workflowInstance.getMediaPackage().getIdentifier().toString();
1286           String orgId = workflowInstance.getOrganizationId();
1287 
1288           updateWorkflowInstanceInIndex(id, state, template, mpId, orgId);
1289         }
1290       } catch (ServiceRegistryException e) {
1291         throw new WorkflowDatabaseException("Update of workflow job " + workflowInstance.getId()
1292             + " in the service registry failed, service registry and workflow table may be out of sync", e);
1293       } catch (NotFoundException e) {
1294         throw new WorkflowDatabaseException("Job for workflow " + workflowInstance.getId()
1295             + " not found in service registry", e);
1296       } catch (Exception e) {
1297         throw new WorkflowDatabaseException("Update of workflow job " + job.getId()
1298             + " in the service registry failed, service registry and workflow table may be out of sync", e);
1299       }
1300 
1301       try {
1302         fireListeners(originalWorkflowInstance, workflowInstance);
1303       } catch (Exception e) {
1304         // Can't happen, since we are converting from an in-memory object
1305         throw new IllegalStateException("In-memory workflow instance could not be serialized", e);
1306       }
1307     } finally {
1308       lock.unlock();
1309     }
1310   }
1311 
1312   /**
1313    * {@inheritDoc}
1314    *
1315    * @see org.opencastproject.workflow.api.WorkflowService#countWorkflowInstances()
1316    */
1317   @Override
1318   public long countWorkflowInstances() throws WorkflowDatabaseException {
1319     return countWorkflowInstances(null);
1320   }
1321 
1322   /**
1323    * {@inheritDoc}
1324    *
1325    * @see org.opencastproject.workflow.api.WorkflowService#countWorkflowInstances(
1326    *      org.opencastproject.workflow.api.WorkflowInstance.WorkflowState)
1327    */
1328   @Override
1329   public long countWorkflowInstances(WorkflowState state) throws WorkflowDatabaseException {
1330     return persistence.countWorkflows(state);
1331   }
1332 
1333   /**
1334    * {@inheritDoc}
1335    *
1336    * @see org.opencastproject.workflow.api.WorkflowService#getWorkflowInstancesByMediaPackage(String)
1337    */
1338   @Override
1339   public List<WorkflowInstance> getWorkflowInstancesByMediaPackage(String mediaPackageId)
1340           throws WorkflowDatabaseException, UnauthorizedException {
1341     // If we have read permission to the media package, return all workflows
1342     if (!assertMediaPackagePermission(mediaPackageId, Permissions.Action.READ.toString())) {
1343       throw new UnauthorizedException("Not allowed to access event");
1344     }
1345     return persistence.getWorkflowInstancesByMediaPackage(mediaPackageId);
1346   }
1347 
1348   /**
1349    * {@inheritDoc}
1350    *
1351    * @see org.opencastproject.workflow.api.WorkflowService#getRunningWorkflowInstanceByMediaPackage(String, String)
1352    */
1353   @Override
1354   public Optional<WorkflowInstance> getRunningWorkflowInstanceByMediaPackage(String mediaPackageId, String action)
1355           throws WorkflowException, UnauthorizedException, WorkflowDatabaseException {
1356     List<WorkflowInstance> workflowInstances = persistence.getRunningWorkflowInstancesByMediaPackage(mediaPackageId);
1357 
1358     // If there is more than workflow running something is very wrong
1359     if (workflowInstances.size() > 1) {
1360       throw new WorkflowException("Multiple workflows are active on mediapackage " + mediaPackageId);
1361     }
1362 
1363     Optional<WorkflowInstance> optWorkflowInstance = Optional.empty();
1364     if (workflowInstances.size() == 1) {
1365       WorkflowInstance wfInstance = workflowInstances.get(0);
1366       optWorkflowInstance = Optional.of(wfInstance);
1367       assertPermission(wfInstance, action, wfInstance.getOrganizationId());
1368     }
1369 
1370     return optWorkflowInstance;
1371   }
1372 
1373   /**
1374    * {@inheritDoc}
1375    *
1376    * @see org.opencastproject.workflow.api.WorkflowService#mediaPackageHasActiveWorkflows(String)
1377    */
1378   @Override
1379   public boolean mediaPackageHasActiveWorkflows(String mediaPackageId) throws WorkflowDatabaseException {
1380     return persistence.mediaPackageHasActiveWorkflows(mediaPackageId);
1381   }
1382 
1383   /**
1384    * {@inheritDoc}
1385    *
1386    * @see org.opencastproject.workflow.api.WorkflowService#userHasActiveWorkflows(String)
1387    */
1388   @Override
1389   public boolean userHasActiveWorkflows(String userId) throws WorkflowDatabaseException {
1390     return persistence.userHasActiveWorkflows(userId);
1391   }
1392 
1393   /**
1394    * Callback for workflow operations that were throwing an exception. This implementation assumes that the operation
1395    * worker has already adjusted the current operation's state appropriately.
1396    *
1397    * @param workflow
1398    *          the workflow instance
1399    * @param currentOperation
1400    *          the current workflow operation
1401    * @return the workflow instance
1402    */
1403   protected WorkflowInstance handleOperationException(
1404       WorkflowInstance workflow,
1405       WorkflowOperationInstance currentOperation) {
1406     int failedAttempt = currentOperation.getFailedAttempts() + 1;
1407     currentOperation.setFailedAttempts(failedAttempt);
1408 
1409     // Operation was aborted by the user, after going into hold state
1410     if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate())
1411             && OperationState.FAILED.equals(currentOperation.getState())) {
1412       int position = workflow.getOperations().indexOf(currentOperation);
1413       // Advance to operation that actually failed
1414       if (workflow.getOperations().size() > position + 1) { // This should always be true...
1415         currentOperation = workflow.getOperations().get(position + 1);
1416         // It's currently in RETRY state, change to FAILED
1417         currentOperation.setState(OperationState.FAILED);
1418       }
1419       handleFailedOperation(workflow, currentOperation);
1420     } else if (currentOperation.getMaxAttempts() != -1 && failedAttempt == currentOperation.getMaxAttempts()) {
1421       handleFailedOperation(workflow, currentOperation);
1422     } else {
1423       switch (currentOperation.getRetryStrategy()) {
1424         case NONE:
1425           handleFailedOperation(workflow, currentOperation);
1426           break;
1427         case RETRY:
1428           currentOperation.setState(OperationState.RETRY);
1429           break;
1430         case HOLD:
1431           currentOperation.setState(OperationState.RETRY);
1432           List<WorkflowOperationInstance> operations = workflow.getOperations();
1433           WorkflowOperationDefinitionImpl errorResolutionDefinition = new WorkflowOperationDefinitionImpl(
1434                   ERROR_RESOLUTION_HANDLER_ID, "Error Resolution Operation", "error", false);
1435           var errorResolutionInstance = new WorkflowOperationInstance(errorResolutionDefinition);
1436           errorResolutionInstance.setExceptionHandlingWorkflow(currentOperation.getExceptionHandlingWorkflow());
1437           var index = workflow.getOperations().indexOf(currentOperation);
1438           operations.add(index, errorResolutionInstance);
1439           workflow.setOperations(operations);
1440           break;
1441         default:
1442           break;
1443       }
1444     }
1445     return workflow;
1446   }
1447 
1448   /**
1449    * Handles the workflow for a failing operation.
1450    *
1451    * @param workflow
1452    *          the workflow
1453    * @param currentOperation
1454    *          the failing workflow operation instance
1455    */
1456   private void handleFailedOperation(WorkflowInstance workflow, WorkflowOperationInstance currentOperation) {
1457     String errorDefId = currentOperation.getExceptionHandlingWorkflow();
1458 
1459     // Adjust the workflow state according to the setting on the operation
1460     if (currentOperation.isFailOnError()) {
1461       if (StringUtils.isBlank(errorDefId)) {
1462         workflow.setState(FAILED);
1463       } else {
1464         workflow.setState(FAILING);
1465 
1466         // Remove the rest of the original workflow
1467         int currentOperationPosition = workflow.getOperations().indexOf(currentOperation);
1468         List<WorkflowOperationInstance> operations = new ArrayList<>(
1469                 workflow.getOperations().subList(0, currentOperationPosition + 1));
1470         workflow.setOperations(operations);
1471 
1472         // Determine the current workflow configuration
1473         Map<String, String> configuration = new HashMap<>();
1474         for (String configKey : workflow.getConfigurationKeys()) {
1475           configuration.put(configKey, workflow.getConfiguration(configKey));
1476         }
1477 
1478         // Append the operations
1479         WorkflowDefinition errorDef = null;
1480         try {
1481           errorDef = getWorkflowDefinitionById(errorDefId);
1482           workflow.extend(errorDef);
1483           workflow.setOperations(updateConfiguration(workflow, configuration).getOperations());
1484         } catch (NotFoundException notFoundException) {
1485           throw new IllegalStateException("Unable to find the error workflow definition '" + errorDefId + "'");
1486         }
1487       }
1488     }
1489 
1490     // Fail the current operation
1491     currentOperation.setState(OperationState.FAILED);
1492   }
1493 
1494   /**
1495    * Callback for workflow operation handlers that executed and finished without exception. This implementation assumes
1496    * that the operation worker has already adjusted the current operation's state appropriately.
1497    *
1498    * @param workflow
1499    *          the workflow instance
1500    * @param result
1501    *          the workflow operation result
1502    * @return the workflow instance
1503    * @throws WorkflowDatabaseException
1504    *           if updating the workflow fails
1505    */
1506   protected WorkflowInstance handleOperationResult(WorkflowInstance workflow, WorkflowOperationResult result)
1507           throws WorkflowDatabaseException {
1508 
1509     // Get the operation and its handler
1510     WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
1511     WorkflowOperationHandler handler = getWorkflowOperationHandler(currentOperation.getTemplate());
1512 
1513     // Create an operation result for the lazy or else update the workflow's media package
1514     if (result == null) {
1515       logger.warn("Handling a null operation result for workflow {} in operation {}", workflow.getId(),
1516               currentOperation.getTemplate());
1517       result = new WorkflowOperationResultImpl(workflow.getMediaPackage(), null, Action.CONTINUE, 0);
1518     } else {
1519       MediaPackage mp = result.getMediaPackage();
1520       if (mp != null) {
1521         workflow.setMediaPackage(mp);
1522       }
1523     }
1524 
1525     // The action to take
1526     Action action = result.getAction();
1527 
1528     // Update the workflow configuration.
1529     workflow = updateConfiguration(workflow, result.getProperties());
1530 
1531     // Adjust workflow statistics
1532     currentOperation.setTimeInQueue(result.getTimeInQueue());
1533 
1534     // Adjust the operation state
1535     switch (action) {
1536       case CONTINUE:
1537         currentOperation.setState(OperationState.SUCCEEDED);
1538         break;
1539       case PAUSE:
1540         if (!(handler instanceof ResumableWorkflowOperationHandler)) {
1541           throw new IllegalStateException("Operation " + currentOperation.getTemplate() + " is not resumable");
1542         }
1543 
1544         // Set abortable and continuable to default values
1545         currentOperation.setContinuable(result.allowsContinue());
1546         currentOperation.setAbortable(result.allowsAbort());
1547 
1548         workflow.setState(PAUSED);
1549         currentOperation.setState(OperationState.PAUSED);
1550         break;
1551       case SKIP:
1552         currentOperation.setState(OperationState.SKIPPED);
1553         break;
1554       default:
1555         throw new IllegalStateException("Unknown action '" + action + "' returned");
1556     }
1557 
1558     if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate()) && result.getAction() == Action.CONTINUE) {
1559 
1560       Map<String, String> resultProperties = result.getProperties();
1561       if (resultProperties == null || StringUtils.isBlank(resultProperties.get(RETRY_STRATEGY))) {
1562         throw new WorkflowDatabaseException("Retry strategy not present in properties!");
1563       }
1564 
1565       RetryStrategy retryStrategy = RetryStrategy.valueOf(resultProperties.get(RETRY_STRATEGY));
1566       switch (retryStrategy) {
1567         case NONE:
1568           handleFailedOperation(workflow, workflow.getCurrentOperation());
1569           break;
1570         case RETRY:
1571           break;
1572         default:
1573           throw new WorkflowDatabaseException("Retry strategy not implemented yet!");
1574       }
1575     }
1576 
1577     return workflow;
1578   }
1579 
1580   /**
1581    * Reads the available metadata from the dublin core catalog (if there is one) and updates the mediapackage.
1582    *
1583    * @param mp
1584    *          the media package
1585    */
1586   protected void populateMediaPackageMetadata(MediaPackage mp) {
1587     if (metadataServices.isEmpty()) {
1588       logger.warn("No metadata services are registered, so no media package metadata can be extracted from catalogs");
1589       return;
1590     }
1591     for (MediaPackageMetadataService metadataService : metadataServices) {
1592       MediaPackageMetadata metadata = metadataService.getMetadata(mp);
1593       MediaPackageMetadataSupport.populateMediaPackageMetadata(mp, metadata);
1594     }
1595   }
1596 
1597   /**
1598    * {@inheritDoc}
1599    *
1600    * @see org.opencastproject.job.api.JobProducer#isReadyToAcceptJobs(String)
1601    */
1602   @Override
1603   public boolean isReadyToAcceptJobs(String operation) {
1604     return true;
1605   }
1606 
1607   /**
1608    * {@inheritDoc}
1609    *
1610    * If we are already running the maximum number of workflows, don't accept another START_WORKFLOW job
1611    *
1612    * @see org.opencastproject.job.api.AbstractJobProducer#isReadyToAccept(org.opencastproject.job.api.Job)
1613    */
1614   @Override
1615   public boolean isReadyToAccept(Job job) throws UndispatchableJobException {
1616     String operation = job.getOperation();
1617 
1618     // Only restrict execution of new jobs
1619     if (!Operation.START_WORKFLOW.toString().equals(operation)) {
1620       return true;
1621     }
1622 
1623     // If the first operation is guaranteed to pause, run the job.
1624     if (job.getArguments().size() > 1 && job.getArguments().get(0) != null) {
1625       try {
1626         WorkflowDefinition workflowDef = XmlWorkflowParser.parseWorkflowDefinition(job.getArguments().get(0));
1627         if (workflowDef.getOperations().size() > 0) {
1628           String firstOperationId = workflowDef.getOperations().get(0).getId();
1629           WorkflowOperationHandler handler = getWorkflowOperationHandler(firstOperationId);
1630           if (handler instanceof ResumableWorkflowOperationHandler) {
1631             if (((ResumableWorkflowOperationHandler) handler).isAlwaysPause()) {
1632               return true;
1633             }
1634           }
1635         }
1636       } catch (WorkflowParsingException e) {
1637         throw new UndispatchableJobException(job + " is not a proper job to start a workflow", e);
1638       }
1639     }
1640 
1641     WorkflowInstance workflow;
1642     Optional<WorkflowInstance> workflowInstance;
1643     String mediaPackageId;
1644 
1645     // Fetch all workflows that are running with the current media package
1646     try {
1647       workflow = getWorkflowById(job.getId());
1648       mediaPackageId = workflow.getMediaPackage().getIdentifier().toString();
1649     } catch (NotFoundException e) {
1650       throw new UndispatchableJobException("Trying to start workflow with job id " + job.getId()
1651           + " but no corresponding instance is available from the workflow service", e);
1652     } catch (UnauthorizedException e) {
1653       throw new UndispatchableJobException(
1654           "Authorization denied while requesting to loading workflow instance. Job: " + job.getId(), e);
1655     }
1656 
1657     try {
1658       workflowInstance = getRunningWorkflowInstanceByMediaPackage(
1659               workflow.getMediaPackage().getIdentifier().toString(), Permissions.Action.READ.toString());
1660     } catch (UnauthorizedException e) {
1661       throw new UndispatchableJobException("Authorization denied while requesting to loading workflow instance "
1662           + workflow.getId(), e);
1663     } catch (WorkflowDatabaseException e) {
1664       throw new UndispatchableJobException("An database error occurred while checking if a workflow is already active "
1665           + "(job: " + job.getId() + ")", e);
1666     } catch (WorkflowException e) {
1667       // Avoid running multiple workflows with same media package id at the same time
1668       delayWorkflow(workflow, mediaPackageId);
1669       return false;
1670     }
1671 
1672     // Make sure we are not excluding ourselves
1673     if (workflowInstance.isPresent() && workflow.getId() != workflowInstance.get().getId()) {
1674       delayWorkflow(workflow, mediaPackageId);
1675       return false;
1676     }
1677 
1678     return true;
1679   }
1680 
1681   private void delayWorkflow(WorkflowInstance workflow, String mediaPackageId) {
1682     if (!delayedWorkflows.contains(workflow.getId())) {
1683       logger.info("Delaying start of workflow {}, another workflow on media package {} is still running",
1684               workflow.getId(), mediaPackageId);
1685       delayedWorkflows.add(workflow.getId());
1686     }
1687   }
1688 
1689   /**
1690    * {@inheritDoc}
1691    *
1692    * @see org.opencastproject.job.api.AbstractJobProducer#acceptJob(org.opencastproject.job.api.Job)
1693    */
1694   @Override
1695   public synchronized void acceptJob(Job job) throws ServiceRegistryException {
1696     User originalUser = securityService.getUser();
1697     Organization originalOrg = securityService.getOrganization();
1698     try {
1699       Organization organization = organizationDirectoryService.getOrganization(job.getOrganization());
1700       securityService.setOrganization(organization);
1701       User user = userDirectoryService.loadUser(job.getCreator());
1702       securityService.setUser(user);
1703       job.setStatus(Job.Status.RUNNING);
1704       job = serviceRegistry.updateJob(job);
1705 
1706       // Check if this workflow was initially delayed
1707       if (delayedWorkflows.contains(job.getId())) {
1708         delayedWorkflows.remove(job.getId());
1709         logger.info("Starting initially delayed workflow {}, {} more waiting", job.getId(), delayedWorkflows.size());
1710       }
1711 
1712       executorService.submit(new JobRunner(job, serviceRegistry.getCurrentJob()));
1713     } catch (Exception e) {
1714       if (e instanceof ServiceRegistryException) {
1715         throw (ServiceRegistryException) e;
1716       }
1717       throw new ServiceRegistryException(e);
1718     } finally {
1719       securityService.setUser(originalUser);
1720       securityService.setOrganization(originalOrg);
1721     }
1722   }
1723 
1724   /**
1725    * Processes the workflow job.
1726    *
1727    * @param job
1728    *          the job
1729    * @return the job payload
1730    * @throws Exception
1731    *           if job processing fails
1732    */
1733   protected String process(Job job) throws Exception {
1734     List<String> arguments = job.getArguments();
1735     Operation op = null;
1736     WorkflowInstance workflowInstance = null;
1737     WorkflowOperationInstance wfo;
1738     String operation = job.getOperation();
1739     try {
1740       try {
1741         op = Operation.valueOf(operation);
1742         switch (op) {
1743           case START_WORKFLOW:
1744             workflowInstance = persistence.getWorkflow(Long.parseLong(job.getPayload()));
1745             logger.debug("Starting new workflow {}", workflowInstance);
1746             runWorkflow(workflowInstance);
1747             break;
1748           case RESUME:
1749             workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1750             Map<String, String> properties = null;
1751             if (arguments.size() > 1) {
1752               Properties props = new Properties();
1753               props.load(IOUtils.toInputStream(arguments.get(arguments.size() - 1), StandardCharsets.UTF_8));
1754               properties = new HashMap<>();
1755               for (Entry<Object, Object> entry : props.entrySet()) {
1756                 properties.put(entry.getKey().toString(), entry.getValue().toString());
1757               }
1758             }
1759             logger.debug("Resuming {} at {}", workflowInstance, workflowInstance.getCurrentOperation());
1760             workflowInstance.setState(RUNNING);
1761             update(workflowInstance);
1762             runWorkflowOperation(workflowInstance, properties);
1763             break;
1764           case START_OPERATION:
1765             workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1766             wfo = workflowInstance.getCurrentOperation();
1767 
1768             if (OperationState.RUNNING.equals(wfo.getState()) || OperationState.PAUSED.equals(wfo.getState())) {
1769               logger.info("Reset operation state {} {} to INSTANTIATED due to job restart", workflowInstance, wfo);
1770               wfo.setState(OperationState.INSTANTIATED);
1771             }
1772 
1773             wfo.setExecutionHost(job.getProcessingHost());
1774             logger.debug("Running {} {}", workflowInstance, wfo);
1775             wfo = runWorkflowOperation(workflowInstance, null);
1776             updateOperationJob(job.getId(), wfo.getState());
1777             break;
1778           default:
1779             throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
1780         }
1781       } catch (IllegalArgumentException e) {
1782         throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
1783       } catch (IndexOutOfBoundsException e) {
1784         throw new ServiceRegistryException(
1785             "The argument list for operation '" + op + "' (job: " + job.getId() + ") does not meet expectations", e);
1786       } catch (NotFoundException e) {
1787         logger.warn("Not found processing job {}", job, e);
1788         updateOperationJob(job.getId(), OperationState.FAILED);
1789       }
1790       return null;
1791     } catch (Exception e) {
1792       logger.warn("Exception while accepting job {}", job, e);
1793       try {
1794         if (workflowInstance != null) {
1795           logger.warn("Marking job {} and workflow instance {} as failed", job, workflowInstance);
1796           updateOperationJob(job.getId(), OperationState.FAILED);
1797           workflowInstance.setState(FAILED);
1798           update(workflowInstance);
1799         } else {
1800           logger.warn("Unable to parse workflow instance", e);
1801         }
1802       } catch (WorkflowDatabaseException e1) {
1803         throw new ServiceRegistryException(e1);
1804       }
1805       if (e instanceof ServiceRegistryException) {
1806         throw e;
1807       }
1808       throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
1809     }
1810   }
1811 
1812   /**
1813    * Synchronizes the workflow operation's job with the operation status if the operation has a job associated with it,
1814    * which is determined by looking at the operation's job id.
1815    *
1816    * @param state
1817    *          the operation state
1818    * @param jobId
1819    *          the associated job
1820    * @return the updated job or <code>null</code> if there is no job for this operation
1821    * @throws ServiceRegistryException
1822    *           if the job can't be updated in the service registry
1823    * @throws NotFoundException
1824    *           if the job can't be found
1825    */
1826   private Job updateOperationJob(Long jobId, OperationState state) throws NotFoundException, ServiceRegistryException {
1827     if (jobId == null) {
1828       return null;
1829     }
1830     Job job = serviceRegistry.getJob(jobId);
1831     switch (state) {
1832       case FAILED:
1833       case RETRY:
1834         job.setStatus(Status.FAILED);
1835         break;
1836       case PAUSED:
1837         job.setStatus(Status.PAUSED);
1838         job.setOperation(Operation.RESUME.toString());
1839         break;
1840       case SKIPPED:
1841       case SUCCEEDED:
1842         job.setStatus(Status.FINISHED);
1843         break;
1844       default:
1845         throw new IllegalStateException("Unexpected state '" + state + "' found");
1846     }
1847     return serviceRegistry.updateJob(job);
1848   }
1849 
1850   /**
1851    * {@inheritDoc}
1852    *
1853    * @see org.opencastproject.job.api.JobProducer#countJobs(org.opencastproject.job.api.Job.Status)
1854    */
1855   @Override
1856   public long countJobs(Status status) throws ServiceRegistryException {
1857     return serviceRegistry.count(JOB_TYPE, status);
1858   }
1859 
1860   /**
1861    * Converts a Map<String, String> to s key=value\n string, suitable for the properties form parameter expected by the
1862    * workflow rest endpoint.
1863    *
1864    * @param props
1865    *          The map of strings
1866    * @return the string representation
1867    */
1868   private String mapToString(Map<String, String> props) {
1869     if (props == null) {
1870       return null;
1871     }
1872     StringBuilder sb = new StringBuilder();
1873     for (Entry<String, String> entry : props.entrySet()) {
1874       sb.append(entry.getKey());
1875       sb.append("=");
1876       sb.append(entry.getValue());
1877       sb.append("\n");
1878     }
1879     return sb.toString();
1880   }
1881 
1882   /**
1883    * Dummy callback for osgi
1884    *
1885    * @param unused
1886    *          the unused ReadinessIndicator
1887    */
1888   @Reference(target = "(artifact=workflowdefinition)")
1889   protected void setProfilesReadyIndicator(ReadinessIndicator unused) { }
1890 
1891   /**
1892    * Callback for the OSGi environment to register with the <code>Workspace</code>.
1893    *
1894    * @param workspace
1895    *          the workspace
1896    */
1897   @Reference
1898   protected void setWorkspace(Workspace workspace) {
1899     this.workspace = workspace;
1900   }
1901 
1902   /**
1903    * Callback for the OSGi environment to register with the <code>ServiceRegistry</code>.
1904    *
1905    * @param registry
1906    *          the service registry
1907    */
1908   @Reference
1909   protected void setServiceRegistry(ServiceRegistry registry) {
1910     this.serviceRegistry = registry;
1911   }
1912 
1913   public ServiceRegistry getServiceRegistry() {
1914     return serviceRegistry;
1915   }
1916 
1917   /**
1918    * Callback for setting the security service.
1919    *
1920    * @param securityService
1921    *          the securityService to set
1922    */
1923   @Reference
1924   public void setSecurityService(SecurityService securityService) {
1925     this.securityService = securityService;
1926   }
1927 
1928   /**
1929    * Callback for setting the authorization service.
1930    *
1931    * @param authorizationService
1932    *          the authorizationService to set
1933    */
1934   @Reference
1935   public void setAuthorizationService(AuthorizationService authorizationService) {
1936     this.authorizationService = authorizationService;
1937   }
1938 
1939   /**
1940    * Callback for setting the user directory service
1941    *
1942    * @param userDirectoryService
1943    *          the userDirectoryService to set
1944    */
1945   @Reference
1946   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1947     this.userDirectoryService = userDirectoryService;
1948   }
1949 
1950   /**
1951    * Sets a reference to the organization directory service.
1952    *
1953    * @param organizationDirectory
1954    *          the organization directory
1955    */
1956   @Reference
1957   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
1958     this.organizationDirectoryService = organizationDirectory;
1959   }
1960 
1961   /**
1962    * Sets the series service
1963    *
1964    * @param seriesService
1965    *          the seriesService to set
1966    */
1967   @Reference
1968   public void setSeriesService(SeriesService seriesService) {
1969     this.seriesService = seriesService;
1970   }
1971 
1972   /**
1973    * Sets the asset manager
1974    *
1975    * @param assetManager
1976    *          the assetManager to set
1977    */
1978   @Reference
1979   public void setAssetManager(AssetManager assetManager) {
1980     this.assetManager = assetManager;
1981   }
1982 
1983   /**
1984    * Callback to set the metadata service
1985    *
1986    * @param service
1987    *          the metadata service
1988    */
1989   @Reference(
1990       cardinality = ReferenceCardinality.AT_LEAST_ONE,
1991       policy = ReferencePolicy.DYNAMIC,
1992       unbind = "removeMetadataService"
1993   )
1994   protected void addMetadataService(MediaPackageMetadataService service) {
1995     metadataServices.add(service);
1996   }
1997 
1998   /**
1999    * Callback to remove a mediapackage metadata service.
2000    *
2001    * @param service
2002    *          the mediapackage metadata service to remove
2003    */
2004   protected void removeMetadataService(MediaPackageMetadataService service) {
2005     metadataServices.remove(service);
2006   }
2007 
2008   /**
2009    * Callback to set the workflow definition scanner
2010    *
2011    * @param scanner
2012    *          the workflow definition scanner
2013    */
2014   @Reference
2015   protected void addWorkflowDefinitionScanner(WorkflowDefinitionScanner scanner) {
2016     workflowDefinitionScanner = scanner;
2017   }
2018 
2019   /**
2020    * Callback to set the Admin UI index.
2021    *
2022    * @param index
2023    *          the admin UI index.
2024    */
2025   @Reference
2026   public void setIndex(ElasticsearchIndex index) {
2027     this.index = index;
2028   }
2029 
2030   /**
2031    * Callback to set the workflow database
2032    *
2033    * @param persistence
2034    *          the workflow database
2035    */
2036   @Reference(name = "workflow-persistence")
2037   public void setPersistence(WorkflowServiceDatabase persistence) {
2038     this.persistence = persistence;
2039   }
2040 
2041   /**
2042    * {@inheritDoc}
2043    *
2044    * @see org.opencastproject.job.api.JobProducer#getJobType()
2045    */
2046   @Override
2047   public String getJobType() {
2048     return JOB_TYPE;
2049   }
2050 
2051   /**
2052    * A tuple of a workflow operation handler and the name of the operation it handles
2053    */
2054   public static class HandlerRegistration {
2055 
2056     protected WorkflowOperationHandler handler;
2057     protected String operationName;
2058 
2059     public HandlerRegistration(String operationName, WorkflowOperationHandler handler) {
2060       if (operationName == null) {
2061         throw new IllegalArgumentException("Operation name cannot be null");
2062       }
2063       if (handler == null) {
2064         throw new IllegalArgumentException("Handler cannot be null");
2065       }
2066       this.operationName = operationName;
2067       this.handler = handler;
2068     }
2069 
2070     public WorkflowOperationHandler getHandler() {
2071       return handler;
2072     }
2073 
2074     /**
2075      * {@inheritDoc}
2076      *
2077      * @see java.lang.Object#hashCode()
2078      */
2079     @Override
2080     public int hashCode() {
2081       final int prime = 31;
2082       int result = 1;
2083       result = prime * result + handler.hashCode();
2084       result = prime * result + operationName.hashCode();
2085       return result;
2086     }
2087 
2088     /**
2089      * {@inheritDoc}
2090      *
2091      * @see java.lang.Object#equals(java.lang.Object)
2092      */
2093     @Override
2094     public boolean equals(Object obj) {
2095       if (this == obj) {
2096         return true;
2097       }
2098       if (obj == null) {
2099         return false;
2100       }
2101       if (getClass() != obj.getClass()) {
2102         return false;
2103       }
2104       HandlerRegistration other = (HandlerRegistration) obj;
2105       if (!handler.equals(other.handler)) {
2106         return false;
2107       }
2108       return operationName.equals(other.operationName);
2109     }
2110   }
2111 
2112   /**
2113    * A utility class to run jobs
2114    */
2115   class JobRunner implements Callable<Void> {
2116 
2117     /** The job */
2118     private Job job = null;
2119 
2120     /** The current job */
2121     private final Job currentJob;
2122 
2123     /**
2124      * Constructs a new job runner
2125      *
2126      * @param job
2127      *          the job to run
2128      * @param currentJob
2129      *          the current running job
2130      */
2131     JobRunner(Job job, Job currentJob) {
2132       this.job = job;
2133       this.currentJob = currentJob;
2134     }
2135 
2136     /**
2137      * {@inheritDoc}
2138      *
2139      * @see java.util.concurrent.Callable#call()
2140      */
2141     @Override
2142     public Void call() throws Exception {
2143       Organization jobOrganization = organizationDirectoryService.getOrganization(job.getOrganization());
2144       try {
2145         serviceRegistry.setCurrentJob(currentJob);
2146         securityService.setOrganization(jobOrganization);
2147         User jobUser = userDirectoryService.loadUser(job.getCreator());
2148         securityService.setUser(jobUser);
2149         process(job);
2150       } finally {
2151         serviceRegistry.setCurrentJob(null);
2152         securityService.setUser(null);
2153         securityService.setOrganization(null);
2154       }
2155       return null;
2156     }
2157   }
2158 
2159   @Override
2160   public synchronized void cleanupWorkflowInstances(int buffer, WorkflowState state) throws UnauthorizedException,
2161           WorkflowDatabaseException {
2162     logger.info("Start cleaning up workflow instances older than {} days with status '{}'", buffer, state);
2163 
2164     int instancesCleaned = 0;
2165     int cleaningFailed = 0;
2166 
2167     Date priorTo = DateUtils.addDays(new Date(), -buffer);
2168 
2169     try {
2170       for (WorkflowInstance workflowInstance : persistence.getWorkflowInstancesForCleanup(state, priorTo)) {
2171         try {
2172           logger.debug("Deleting workflow instance {}", workflowInstance.getId());
2173           remove(workflowInstance.getId());
2174           instancesCleaned++;
2175         } catch (WorkflowDatabaseException | UnauthorizedException e) {
2176           throw e;
2177         } catch (NotFoundException e) {
2178           // Since we are in a cleanup operation, we don't have to care about NotFoundExceptions
2179           logger.debug("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2180         } catch (WorkflowParsingException | WorkflowStateException e) {
2181           logger.warn("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2182           cleaningFailed++;
2183         }
2184       }
2185     } catch (WorkflowDatabaseException e) {
2186       throw new WorkflowDatabaseException(e);
2187     }
2188 
2189     if (instancesCleaned == 0 && cleaningFailed == 0) {
2190       logger.info("No workflow instances found to clean up");
2191       return;
2192     }
2193 
2194     if (instancesCleaned > 0) {
2195       logger.info("Cleaned up '{}' workflow instances", instancesCleaned);
2196     }
2197     if (cleaningFailed > 0) {
2198       logger.warn("Cleaning failed for '{}' workflow instances", cleaningFailed);
2199       throw new WorkflowDatabaseException("Unable to clean all workflow instances, see logs!");
2200     }
2201   }
2202 
2203   @Override
2204   public Map<String, Map<String, String>> getWorkflowStateMappings() {
2205     return workflowDefinitionScanner.workflowStateMappings.entrySet().stream().collect(Collectors.toMap(
2206             Entry::getKey, e -> e.getValue().stream()
2207                     .collect(Collectors.toMap(m -> m.getState().name(), WorkflowStateMapping::getValue))
2208     ));
2209   }
2210 
2211   @Override
2212   public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
2213     try {
2214       final int total;
2215       try {
2216         total = persistence.countMediaPackages();
2217       } catch (WorkflowDatabaseException e) {
2218         logIndexRebuildError(logger, e);
2219         throw new IndexRebuildException(getService(), e);
2220       }
2221 
2222       if (total > 0) {
2223         logIndexRebuildBegin(logger, total, "workflows");
2224         int current = 0;
2225         int n = 20;
2226         List<WorkflowIndexData> workflowIndexData;
2227 
2228         int limit = 1000;
2229         int offset = 0;
2230         String currentMediapackageId;
2231         String lastMediapackageId = "";
2232         var updatedWorkflowRange = new ArrayList<Event>();
2233         do {
2234           try {
2235             workflowIndexData = persistence.getWorkflowIndexData(limit, offset);
2236           } catch (WorkflowDatabaseException e) {
2237             logIndexRebuildError(logger, e);
2238             throw new IndexRebuildException(getService(), e);
2239           }
2240           if (!workflowIndexData.isEmpty()) {
2241             offset += limit;
2242             logger.debug("Got {} workflows for re-indexing", workflowIndexData.size());
2243 
2244             for (WorkflowIndexData indexData : workflowIndexData) {
2245               currentMediapackageId = indexData.getMediaPackageId();
2246               if (currentMediapackageId.equals(lastMediapackageId)) {
2247                 continue;
2248               }
2249               current++;
2250 
2251               // Include PAUSED; otherwise, paused workflows will show up as "Finished"
2252               if (!WorkflowUtil.isActive(WorkflowInstance.WorkflowState.values()[indexData.getState()].toString())
2253                       || WorkflowState.PAUSED == WorkflowInstance.WorkflowState.values()[indexData.getState()]) {
2254                 String orgid = indexData.getOrganizationId();
2255                 if (null == orgid) {
2256                   String mpId = indexData.getMediaPackageId();
2257                   //We're assuming here that mediapackages don't change orgs
2258                   RichAResult results = assetManager.getSnapshotsById(mpId);
2259                   if (results.getSize() == 0) {
2260                     logger.debug("Dropping {} from the index since it is missing from the database", mpId);
2261                     continue;
2262                   }
2263                   orgid = results.getSnapshots().stream().findFirst().get().getOrganizationId();
2264                   //We try-catch here since it's possible for the WF to exist in the *index* but not in the *DB*
2265                   // It probably shouldn't be, but that won't keep it from happening anyway.
2266                   try {
2267                     //NB: This version of getWorkflow takes the org id, which in this case is null
2268                     // Using the normal version filters by org, and since this workflow has a NULL org it can't be found
2269                     WorkflowInstance instance = persistence.getWorkflow(indexData.getId(), null);
2270                     instance.setOrganizationId(orgid);
2271                     persistence.updateInDatabase(instance);
2272                   } catch (NotFoundException e) {
2273                     //Technically this should never happen, but getWorkflow throws it.
2274                   }
2275                 }
2276                 var updatedWorkflowData = index.getEvent(indexData.getMediaPackageId(), orgid,
2277                     securityService.getUser());
2278                 updatedWorkflowData = getStateUpdateFunction(indexData.getId(),indexData.getState(),
2279                         indexData.getMediaPackageId(), indexData.getTemplate(), indexData.getOrganizationId())
2280                         .apply(updatedWorkflowData);
2281                 updatedWorkflowRange.add(updatedWorkflowData.get());
2282 
2283                 if (updatedWorkflowRange.size() >= n || current >= total) {
2284                   index.bulkEventUpdate(updatedWorkflowRange);
2285                   logIndexRebuildProgress(logger, total, current, n);
2286                   updatedWorkflowRange.clear();
2287                 }
2288               }
2289               else {
2290                 logger.info("Skipping. Workflow {} is currently active.", indexData.getId());
2291               }
2292 
2293               lastMediapackageId = currentMediapackageId;
2294             }
2295           }
2296         } while (!workflowIndexData.isEmpty());
2297       }
2298     } catch (Exception e) {
2299       logIndexRebuildError(logger, e);
2300       throw new IndexRebuildException(getService(), e);
2301     }
2302   }
2303 
2304   @Override
2305   public IndexRebuildService.Service getService() {
2306     return IndexRebuildService.Service.Workflow;
2307   }
2308 
2309   /**
2310    * Remove a workflow instance from the Elasticsearch index.
2311    *
2312    * @param workflowInstanceId
2313    *         the identifier of the workflow instance to remove
2314    */
2315   private void removeWorkflowInstanceFromIndex(long workflowInstanceId) {
2316     final String orgId = securityService.getOrganization().getId();
2317     final User user = securityService.getUser();
2318 
2319     // find events
2320     SearchResult<Event> results;
2321     try {
2322       results = index.getByQuery(new EventSearchQuery(orgId, user).withWorkflowId(workflowInstanceId));
2323     } catch (SearchIndexException e) {
2324       logger.error("Error retrieving the events for workflow instance {} from the {} index.", workflowInstanceId,
2325               index.getIndexName(), e);
2326       return;
2327     }
2328 
2329     if (results.getItems().length == 0) {
2330       logger.warn("No events for workflow instance {} found in the {} index.", workflowInstanceId,
2331               index.getIndexName());
2332       return;
2333     }
2334 
2335     // should be only one event, but better safe than sorry
2336     for (SearchResultItem<Event> item: results.getItems()) {
2337       String eventId = item.getSource().getIdentifier();
2338       logger.debug("Removing workflow instance {} of event {} from the {} index.", workflowInstanceId, eventId,
2339               index.getIndexName());
2340 
2341       Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
2342         if (!eventOpt.isPresent()) {
2343           logger.warn("Event {} of workflow instance {} not found in the {} index.", workflowInstanceId,
2344                   eventId, index.getIndexName());
2345           return Optional.empty();
2346         }
2347         Event event = eventOpt.get();
2348         if (event.getWorkflowId() != null && event.getWorkflowId().equals(workflowInstanceId)) {
2349           logger.debug("Workflow {} is the current workflow of event {}. Removing it from event.", eventId,
2350                   workflowInstanceId);
2351           event.setWorkflowId(null);
2352           event.setWorkflowDefinitionId(null);
2353           event.setWorkflowState(null);
2354           return Optional.of(event);
2355         }
2356         return Optional.empty();
2357       };
2358 
2359       try {
2360         index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
2361         logger.debug("Workflow instance {} of event {} removed from the {} index.", workflowInstanceId, eventId,
2362                 index.getIndexName());
2363       } catch (SearchIndexException e) {
2364         logger.error("Error removing the workflow instance {} of event {} from the {} index.", workflowInstanceId,
2365                 eventId, index.getIndexName(), e);
2366       }
2367     }
2368   }
2369 
2370   /**
2371    * Update a workflow instance in the Elasticsearch index.
2372    *
2373    * @param id
2374    *         workflow id
2375    * @param state
2376    *         workflow state as int
2377    * @param mpId
2378    *         corresponding mediapackage id
2379    * @param orgId
2380    *         workflow organization id
2381    */
2382   private void updateWorkflowInstanceInIndex(long id, int state, String wfDefId, String mpId, String orgId) {
2383     final User user = securityService.getUser();
2384 
2385     logger.debug("Updating workflow instance {} of event {} in the {} index.", id, mpId,
2386             index.getIndexName());
2387     Function<Optional<Event>, Optional<Event>> updateFunction = getStateUpdateFunction(id, state, wfDefId, mpId, orgId);
2388 
2389     try {
2390       index.addOrUpdateEvent(mpId, updateFunction, orgId, user);
2391       logger.debug("Workflow instance {} of event {} updated in the {} index.", id, mpId,
2392               index.getIndexName());
2393     } catch (SearchIndexException e) {
2394       logger.error("Error updating the workflow instance {} of event {} in the {} index.", id, mpId,
2395               index.getIndexName(), e);
2396     }
2397   }
2398 
2399   /**
2400    * Get the function to update the workflow state for an event in the Elasticsearch index.
2401    *
2402    * @return the function to do the update
2403    */
2404   private Function<Optional<Event>, Optional<Event>> getStateUpdateFunction(long workflowId,
2405           int workflowState, String workflowDefinitionId, String mediaPackageId,
2406           String orgId) {
2407     return (Optional<Event> eventOpt) -> {
2408       Event event = eventOpt.orElse(new Event(mediaPackageId, orgId));
2409       event.setWorkflowId(workflowId);
2410       event.setWorkflowState(WorkflowState.values()[workflowState]);
2411       event.setWorkflowDefinitionId(workflowDefinitionId);
2412       return Optional.of(event);
2413     };
2414   }
2415 }