View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.impl;
23  
24  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
25  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILED;
26  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILING;
27  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.INSTANTIATED;
28  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.PAUSED;
29  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.RUNNING;
30  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.STOPPED;
31  import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.SUCCEEDED;
32  
33  import org.opencastproject.assetmanager.api.AssetManager;
34  import org.opencastproject.assetmanager.api.Snapshot;
35  import org.opencastproject.assetmanager.util.WorkflowPropertiesUtil;
36  import org.opencastproject.elasticsearch.api.SearchIndexException;
37  import org.opencastproject.elasticsearch.api.SearchResult;
38  import org.opencastproject.elasticsearch.api.SearchResultItem;
39  import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
40  import org.opencastproject.elasticsearch.index.objects.event.Event;
41  import org.opencastproject.elasticsearch.index.objects.event.EventSearchQuery;
42  import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
43  import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
44  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
45  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
46  import org.opencastproject.job.api.Job;
47  import org.opencastproject.job.api.Job.Status;
48  import org.opencastproject.job.api.JobProducer;
49  import org.opencastproject.mediapackage.MediaPackage;
50  import org.opencastproject.mediapackage.MediaPackageElement;
51  import org.opencastproject.mediapackage.MediaPackageParser;
52  import org.opencastproject.mediapackage.MediaPackageSupport;
53  import org.opencastproject.mediapackage.Publication;
54  import org.opencastproject.metadata.api.MediaPackageMetadata;
55  import org.opencastproject.metadata.api.MediaPackageMetadataService;
56  import org.opencastproject.metadata.api.MetadataService;
57  import org.opencastproject.metadata.api.util.MediaPackageMetadataSupport;
58  import org.opencastproject.security.api.AccessControlList;
59  import org.opencastproject.security.api.AccessControlUtil;
60  import org.opencastproject.security.api.AclScope;
61  import org.opencastproject.security.api.AuthorizationService;
62  import org.opencastproject.security.api.Organization;
63  import org.opencastproject.security.api.OrganizationDirectoryService;
64  import org.opencastproject.security.api.Permissions;
65  import org.opencastproject.security.api.SecurityService;
66  import org.opencastproject.security.api.UnauthorizedException;
67  import org.opencastproject.security.api.User;
68  import org.opencastproject.security.api.UserDirectoryService;
69  import org.opencastproject.series.api.SeriesException;
70  import org.opencastproject.series.api.SeriesService;
71  import org.opencastproject.serviceregistry.api.ServiceRegistry;
72  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
73  import org.opencastproject.serviceregistry.api.UndispatchableJobException;
74  import org.opencastproject.util.NotFoundException;
75  import org.opencastproject.util.ReadinessIndicator;
76  import org.opencastproject.util.data.Tuple;
77  import org.opencastproject.workflow.api.ResumableWorkflowOperationHandler;
78  import org.opencastproject.workflow.api.RetryStrategy;
79  import org.opencastproject.workflow.api.WorkflowDatabaseException;
80  import org.opencastproject.workflow.api.WorkflowDefinition;
81  import org.opencastproject.workflow.api.WorkflowException;
82  import org.opencastproject.workflow.api.WorkflowIdentifier;
83  import org.opencastproject.workflow.api.WorkflowIndexData;
84  import org.opencastproject.workflow.api.WorkflowInstance;
85  import org.opencastproject.workflow.api.WorkflowInstance.WorkflowState;
86  import org.opencastproject.workflow.api.WorkflowListener;
87  import org.opencastproject.workflow.api.WorkflowOperationDefinition;
88  import org.opencastproject.workflow.api.WorkflowOperationDefinitionImpl;
89  import org.opencastproject.workflow.api.WorkflowOperationHandler;
90  import org.opencastproject.workflow.api.WorkflowOperationInstance;
91  import org.opencastproject.workflow.api.WorkflowOperationInstance.OperationState;
92  import org.opencastproject.workflow.api.WorkflowOperationResult;
93  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
94  import org.opencastproject.workflow.api.WorkflowOperationResultImpl;
95  import org.opencastproject.workflow.api.WorkflowParsingException;
96  import org.opencastproject.workflow.api.WorkflowService;
97  import org.opencastproject.workflow.api.WorkflowServiceDatabase;
98  import org.opencastproject.workflow.api.WorkflowStateException;
99  import org.opencastproject.workflow.api.WorkflowStateMapping;
100 import org.opencastproject.workflow.api.WorkflowUtil;
101 import org.opencastproject.workflow.api.XmlWorkflowParser;
102 import org.opencastproject.workspace.api.Workspace;
103 
104 import com.google.common.util.concurrent.Striped;
105 
106 import org.apache.commons.io.IOUtils;
107 import org.apache.commons.lang3.StringUtils;
108 import org.apache.commons.lang3.time.DateUtils;
109 import org.osgi.framework.InvalidSyntaxException;
110 import org.osgi.framework.ServiceReference;
111 import org.osgi.service.component.ComponentContext;
112 import org.osgi.service.component.annotations.Activate;
113 import org.osgi.service.component.annotations.Component;
114 import org.osgi.service.component.annotations.Reference;
115 import org.osgi.service.component.annotations.ReferenceCardinality;
116 import org.osgi.service.component.annotations.ReferencePolicy;
117 import org.slf4j.Logger;
118 import org.slf4j.LoggerFactory;
119 
120 import java.io.ByteArrayOutputStream;
121 import java.io.IOException;
122 import java.nio.charset.StandardCharsets;
123 import java.util.ArrayList;
124 import java.util.Collections;
125 import java.util.Comparator;
126 import java.util.Date;
127 import java.util.HashMap;
128 import java.util.HashSet;
129 import java.util.List;
130 import java.util.Map;
131 import java.util.Map.Entry;
132 import java.util.Optional;
133 import java.util.Properties;
134 import java.util.Set;
135 import java.util.SortedSet;
136 import java.util.TreeSet;
137 import java.util.concurrent.Callable;
138 import java.util.concurrent.CopyOnWriteArrayList;
139 import java.util.concurrent.Executors;
140 import java.util.concurrent.ThreadPoolExecutor;
141 import java.util.concurrent.locks.Lock;
142 import java.util.function.Function;
143 import java.util.stream.Collectors;
144 
145 /**
146  * Implements WorkflowService with in-memory data structures to hold WorkflowOperations and WorkflowInstances.
147  * WorkflowOperationHandlers are looked up in the OSGi service registry based on the "workflow.operation" property. If
148  * the WorkflowOperationHandler's "workflow.operation" service registration property matches
149  * WorkflowOperation.getName(), then the factory returns a WorkflowOperationRunner to handle that operation. This allows
150  * for custom runners to be added or modified without affecting the workflow service itself.
151  */
152 @Component(
153   property = {
154     "service.description=Workflow Service",
155     "service.pid=org.opencastproject.workflow.impl.WorkflowServiceImpl"
156   },
157   immediate = true,
158   service = { WorkflowService.class, WorkflowServiceImpl.class, IndexProducer.class }
159 )
160 public class WorkflowServiceImpl extends AbstractIndexProducer implements WorkflowService, JobProducer {
161 
162   /** Retry strategy property name */
163   private static final String RETRY_STRATEGY = "retryStrategy";
164 
165   /** Logging facility */
166   private static final Logger logger = LoggerFactory.getLogger(WorkflowServiceImpl.class);
167 
168   /** List of available operations on jobs */
169   enum Operation {
170     START_WORKFLOW, RESUME, START_OPERATION
171   }
172 
173   /** Constant value indicating a <code>null</code> parent id */
174   private static final String NULL_PARENT_ID = "-";
175 
176   /** The load imposed on the system by a workflow job.
177    *  We are keeping this hardcoded because otherwise bad things will likely happen,
178    *  like an inability to process a workflow past a certain point in high-load conditions
179    */
180   private static final float WORKFLOW_JOB_LOAD = 0.0f;
181 
182   /** Error resolution handler id constant */
183   public static final String ERROR_RESOLUTION_HANDLER_ID = "error-resolution";
184 
185   /** Remove references to the component context once felix scr 1.2 becomes available */
186   protected ComponentContext componentContext = null;
187 
188   /** The metadata services */
189   private SortedSet<MediaPackageMetadataService> metadataServices;
190 
191   /** Persistent storage */
192   protected WorkflowServiceDatabase persistence;
193 
194   /** The list of workflow listeners */
195   private final List<WorkflowListener> listeners = new CopyOnWriteArrayList<WorkflowListener>();
196 
197   /** The thread pool to use for firing listeners and handling dispatched jobs */
198   protected ThreadPoolExecutor executorService;
199 
200   /** The workspace */
201   protected Workspace workspace = null;
202 
203   /** The service registry */
204   protected ServiceRegistry serviceRegistry = null;
205 
206   /** The security service */
207   protected SecurityService securityService = null;
208 
209   /** The authorization service */
210   protected AuthorizationService authorizationService = null;
211 
212   /** The user directory service */
213   protected UserDirectoryService userDirectoryService = null;
214 
215   /** The organization directory service */
216   protected OrganizationDirectoryService organizationDirectoryService = null;
217 
218   /** The series service */
219   protected SeriesService seriesService;
220 
221   /** The asset manager */
222   protected AssetManager assetManager = null;
223 
224   /** The workflow definition scanner */
225   private WorkflowDefinitionScanner workflowDefinitionScanner;
226 
227   /** List of initially delayed workflows */
228   private final List<Long> delayedWorkflows = new ArrayList<Long>();
229 
230   /** Striped locks for synchronization */
231   private final Striped<Lock> lock = Striped.lazyWeakLock(1024);
232   private final Striped<Lock> updateLock = Striped.lazyWeakLock(1024);
233   private final Striped<Lock> mediaPackageLocks = Striped.lazyWeakLock(1024);
234 
235   /** The Elasticsearch indices */
236   private ElasticsearchIndex index;
237 
238   /**
239    * Constructs a new workflow service impl, with a priority-sorted map of metadata services
240    */
241   public WorkflowServiceImpl() {
242     metadataServices = new TreeSet<>(Comparator.comparingInt(MetadataService::getPriority));
243   }
244 
245   /**
246    * Activate this service implementation via the OSGI service component runtime.
247    *
248    * @param componentContext
249    *          the component context
250    */
251   @Activate
252   public void activate(ComponentContext componentContext) {
253     this.componentContext = componentContext;
254     executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();
255     logger.info("Activate Workflow service");
256   }
257 
258   /**
259    * {@inheritDoc}
260    *
261    * @see org.opencastproject.workflow.api.WorkflowService#addWorkflowListener(org.opencastproject.workflow.api.WorkflowListener)
262    */
263   @Override
264   public void addWorkflowListener(WorkflowListener listener) {
265     listeners.add(listener);
266   }
267 
268   /**
269    * {@inheritDoc}
270    *
271    * @see org.opencastproject.workflow.api.WorkflowService#removeWorkflowListener(org.opencastproject.workflow.api.WorkflowListener)
272    */
273   @Override
274   public void removeWorkflowListener(WorkflowListener listener) {
275     listeners.remove(listener);
276   }
277 
278   /**
279    * Fires the workflow listeners on workflow updates.
280    */
281   protected void fireListeners(final WorkflowInstance oldWorkflowInstance, final WorkflowInstance newWorkflowInstance) {
282     final User currentUser = securityService.getUser();
283     final Organization currentOrganization = securityService.getOrganization();
284     for (final WorkflowListener listener : listeners) {
285       if (oldWorkflowInstance == null || !oldWorkflowInstance.getState().equals(newWorkflowInstance.getState())) {
286         Runnable runnable = () -> {
287           try {
288             securityService.setUser(currentUser);
289             securityService.setOrganization(currentOrganization);
290             listener.stateChanged(newWorkflowInstance);
291           } finally {
292             securityService.setUser(null);
293             securityService.setOrganization(null);
294           }
295         };
296         executorService.execute(runnable);
297       } else {
298         logger.debug("Not notifying {} because the workflow state has not changed", listener);
299       }
300 
301       if (newWorkflowInstance.getCurrentOperation() != null) {
302         if (oldWorkflowInstance == null || oldWorkflowInstance.getCurrentOperation() == null
303                 || !oldWorkflowInstance.getCurrentOperation().equals(newWorkflowInstance.getCurrentOperation())) {
304           Runnable runnable = () -> {
305             try {
306               securityService.setUser(currentUser);
307               securityService.setOrganization(currentOrganization);
308               listener.operationChanged(newWorkflowInstance);
309             } finally {
310               securityService.setUser(null);
311               securityService.setOrganization(null);
312             }
313           };
314           executorService.execute(runnable);
315         }
316       } else {
317         logger.debug("Not notifying {} because the workflow operation has not changed", listener);
318       }
319     }
320   }
321 
322   /**
323    * {@inheritDoc}
324    *
325    * @see org.opencastproject.workflow.api.WorkflowService#listAvailableWorkflowDefinitions()
326    */
327   @Override
328   public List<WorkflowDefinition> listAvailableWorkflowDefinitions() {
329     return workflowDefinitionScanner
330             .getAvailableWorkflowDefinitions(securityService.getOrganization(), securityService.getUser())
331             .sorted()
332             .collect(Collectors.toList());
333   }
334 
335   /**
336    * {@inheritDoc}
337    */
338   public boolean isRunnable(WorkflowDefinition workflowDefinition) {
339     List<String> availableOperations = listAvailableOperationNames();
340     List<WorkflowDefinition> checkedWorkflows = new ArrayList<>();
341     boolean runnable = isRunnable(workflowDefinition, availableOperations, checkedWorkflows);
342     int wfCount = checkedWorkflows.size() - 1;
343     if (runnable)
344       logger.info("Workflow {}, containing {} derived workflows, is runnable", workflowDefinition, wfCount);
345     else
346       logger.warn("Workflow {}, containing {} derived workflows, is not runnable", workflowDefinition, wfCount);
347     return runnable;
348   }
349 
350   /**
351    * Tests the workflow definition for its runnability. This method is a helper for
352    * {@link #isRunnable(WorkflowDefinition)} that is suited for recursive calling.
353    *
354    * @param workflowDefinition
355    *          the definition to test
356    * @param availableOperations
357    *          list of currently available operation handlers
358    * @param checkedWorkflows
359    *          list of checked workflows, used to avoid circular checking
360    * @return <code>true</code> if all bits and pieces used for executing <code>workflowDefinition</code> are in place
361    */
362   private boolean isRunnable(WorkflowDefinition workflowDefinition, List<String> availableOperations,
363           List<WorkflowDefinition> checkedWorkflows) {
364     if (checkedWorkflows.contains(workflowDefinition))
365       return true;
366 
367     // Test availability of operation handler and catch workflows
368     for (WorkflowOperationDefinition op : workflowDefinition.getOperations()) {
369       if (!availableOperations.contains(op.getId())) {
370         logger.info("{} is not runnable due to missing operation {}", workflowDefinition, op);
371         return false;
372       }
373       String catchWorkflow = op.getExceptionHandlingWorkflow();
374       if (catchWorkflow != null) {
375         WorkflowDefinition catchWorkflowDefinition;
376         try {
377           catchWorkflowDefinition = getWorkflowDefinitionById(catchWorkflow);
378         } catch (NotFoundException e) {
379           logger.info("{} is not runnable due to missing catch workflow {} on operation {}", workflowDefinition,
380                   catchWorkflow, op);
381           return false;
382         }
383         if (!isRunnable(catchWorkflowDefinition, availableOperations, checkedWorkflows))
384           return false;
385       }
386     }
387 
388     // Add the workflow to the list of checked workflows
389     if (!checkedWorkflows.contains(workflowDefinition))
390       checkedWorkflows.add(workflowDefinition);
391     return true;
392   }
393 
394   /**
395    * Gets the currently registered workflow operation handlers.
396    *
397    * @return All currently registered handlers
398    */
399   public Set<HandlerRegistration> getRegisteredHandlers() {
400     Set<HandlerRegistration> set = new HashSet<>();
401     ServiceReference[] refs;
402     try {
403       refs = componentContext.getBundleContext().getServiceReferences(WorkflowOperationHandler.class.getName(), null);
404     } catch (InvalidSyntaxException e) {
405       throw new IllegalStateException(e);
406     }
407     if (refs != null) {
408       for (ServiceReference ref : refs) {
409         WorkflowOperationHandler handler = (WorkflowOperationHandler) componentContext.getBundleContext().getService(
410                 ref);
411         set.add(new HandlerRegistration((String) ref.getProperty(WORKFLOW_OPERATION_PROPERTY), handler));
412       }
413     } else {
414       logger.warn("No registered workflow operation handlers found");
415     }
416     return set;
417   }
418 
419   protected WorkflowOperationHandler getWorkflowOperationHandler(String operationId) {
420     for (HandlerRegistration reg : getRegisteredHandlers()) {
421       if (reg.operationName.equals(operationId))
422         return reg.handler;
423     }
424     return null;
425   }
426 
427   /**
428    * Lists the names of each workflow operation. Operation names are availalbe for use if there is a registered
429    * {@link WorkflowOperationHandler} with an equal {@link WorkflowServiceImpl#WORKFLOW_OPERATION_PROPERTY} property.
430    *
431    * @return The {@link List} of available workflow operation names
432    */
433   protected List<String> listAvailableOperationNames() {
434     return getRegisteredHandlers().parallelStream().map(op -> op.operationName).collect(Collectors.toList());
435   }
436 
437   /**
438    * {@inheritDoc}
439    *
440    * @see org.opencastproject.workflow.api.WorkflowService#getWorkflowById(long)
441    */
442   @Override
443   public WorkflowInstance getWorkflowById(long id) throws NotFoundException,
444           UnauthorizedException {
445     try {
446       WorkflowInstance workflow = persistence.getWorkflow(id);
447       assertPermission(workflow, Permissions.Action.READ.toString(), workflow.getOrganizationId());
448       return workflow;
449     } catch (WorkflowDatabaseException e) {
450       throw new IllegalStateException("Got not get workflow from database with id ");
451     }
452   }
453 
454   /**
455    * {@inheritDoc}
456    *
457    * @see org.opencastproject.workflow.api.WorkflowService#start(org.opencastproject.workflow.api.WorkflowDefinition,
458    *      org.opencastproject.mediapackage.MediaPackage)
459    */
460   @Override
461   public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage)
462           throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
463     return start(workflowDefinition, mediaPackage, new HashMap<>());
464   }
465 
466   /**
467    * {@inheritDoc}
468    *
469    * @see org.opencastproject.workflow.api.WorkflowService#start(org.opencastproject.workflow.api.WorkflowDefinition,
470    *      org.opencastproject.mediapackage.MediaPackage)
471    */
472   @Override
473   public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage,
474           Map<String, String> properties)
475           throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
476     try {
477       return start(workflowDefinition, mediaPackage, null, properties);
478     } catch (NotFoundException e) {
479       // should never happen
480       throw new IllegalStateException("a null workflow ID caused a NotFoundException.  This is a programming error.");
481     }
482   }
483 
484   /**
485    * {@inheritDoc}
486    *
487    * @see org.opencastproject.workflow.api.WorkflowService#start(org.opencastproject.workflow.api.WorkflowDefinition,
488    *      org.opencastproject.mediapackage.MediaPackage, Long, java.util.Map)
489    */
490   @Override
491   public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage sourceMediaPackage,
492           Long parentWorkflowId, Map<String, String> originalProperties) throws WorkflowDatabaseException,
493           NotFoundException, UnauthorizedException, WorkflowParsingException, IllegalStateException {
494     final String mediaPackageId = sourceMediaPackage.getIdentifier().toString();
495     Map<String, String> properties = null;
496 
497     // WorkflowPropertiesUtil.storeProperties will take a snapshot if there isn't one
498     // and we want the mp in the snapshot to have all the metadata populated.
499     populateMediaPackageMetadata(sourceMediaPackage);
500 
501     if (originalProperties != null) {
502       WorkflowPropertiesUtil.storeProperties(assetManager, sourceMediaPackage, originalProperties);
503       properties = WorkflowPropertiesUtil.getLatestWorkflowProperties(assetManager, mediaPackageId);
504     }
505 
506     // We have to synchronize per media package to avoid starting multiple simultaneous workflows for one media package.
507     final Lock lock = mediaPackageLocks.get(mediaPackageId);
508     lock.lock();
509 
510     try {
511       if (workflowDefinition == null)
512         throw new IllegalArgumentException("workflow definition must not be null");
513       Optional<List<String>> errors = MediaPackageSupport.sanityCheck(sourceMediaPackage);
514       if (errors.isPresent()) {
515         throw new IllegalArgumentException("Insane media package cannot be processed: " + String.join("; ", errors.get()));
516       }
517       if (parentWorkflowId != null) {
518         try {
519           getWorkflowById(parentWorkflowId); // Let NotFoundException bubble up
520         } catch (UnauthorizedException e) {
521           throw new IllegalArgumentException("Parent workflow " + parentWorkflowId + " not visible to this user");
522         }
523       } else {
524         if (persistence.mediaPackageHasActiveWorkflows(mediaPackageId)) {
525           throw new IllegalStateException(String.format(
526                   "Can't start workflow '%s' for media package '%s' because another workflow is currently active.",
527                   workflowDefinition.getTitle(),
528                   sourceMediaPackage.getIdentifier().toString()));
529         }
530       }
531 
532       // Get the current user
533       User currentUser = securityService.getUser();
534       validUserOrThrow(currentUser);
535 
536       // Get the current organization
537       Organization organization = securityService.getOrganization();
538       if (organization == null)
539         throw new SecurityException("Current organization is unknown");
540 
541       WorkflowInstance workflowInstance = new WorkflowInstance(workflowDefinition, sourceMediaPackage,
542               currentUser, organization, properties);
543       workflowInstance = updateConfiguration(workflowInstance, properties);
544 
545       // Create and configure the workflow instance
546       try {
547         // Create a new job for this workflow instance
548         String workflowDefinitionXml = XmlWorkflowParser.toXml(workflowDefinition);
549         String mediaPackageXml = MediaPackageParser.getAsXml(sourceMediaPackage);
550 
551         List<String> arguments = new ArrayList<>();
552         arguments.add(workflowDefinitionXml);
553         arguments.add(mediaPackageXml);
554         if (parentWorkflowId != null || properties != null) {
555           String parentWorkflowIdString = (parentWorkflowId != null) ? parentWorkflowId.toString() : NULL_PARENT_ID;
556           arguments.add(parentWorkflowIdString);
557         }
558         if (properties != null) {
559           arguments.add(mapToString(properties));
560         }
561 
562         Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_WORKFLOW.toString(), arguments,
563                 null, false, null, WORKFLOW_JOB_LOAD);
564 
565         // Have the workflow take on the job's identity
566         workflowInstance.setId(job.getId());
567 
568         // Add the workflow to the search index and have the job enqueued for dispatch.
569         // Update also sets ACL and mediapackage metadata
570         update(workflowInstance);
571 
572         return workflowInstance;
573       } catch (Throwable t) {
574         try {
575           workflowInstance.setState(FAILED);
576           update(workflowInstance);
577         } catch (Exception failureToFail) {
578           logger.warn("Unable to update workflow to failed state", failureToFail);
579         }
580         try {
581           throw t;
582         } catch (ServiceRegistryException e) {
583           throw new WorkflowDatabaseException(e);
584         }
585       }
586     } finally {
587       lock.unlock();
588     }
589   }
590 
591   protected WorkflowInstance updateConfiguration(WorkflowInstance instance, Map<String, String> properties) {
592     if (properties != null) {
593       for (Entry<String, String> entry : properties.entrySet()) {
594         instance.setConfiguration(entry.getKey(), entry.getValue());
595       }
596     }
597     return instance;
598   }
599 
600   /**
601    * Does a lookup of available operation handlers for the given workflow operation.
602    *
603    * @param operation
604    *          the operation definition
605    * @return the handler or <code>null</code>
606    */
607   protected WorkflowOperationHandler selectOperationHandler(WorkflowOperationInstance operation) {
608     List<WorkflowOperationHandler> handlerList = new ArrayList<>();
609     for (HandlerRegistration handlerReg : getRegisteredHandlers()) {
610       if (handlerReg.operationName != null && handlerReg.operationName.equals(operation.getTemplate())) {
611         handlerList.add(handlerReg.handler);
612       }
613     }
614     if (handlerList.size() > 1) {
615       throw new IllegalStateException("Multiple operation handlers found for operation '" + operation.getTemplate()
616               + "'");
617     } else if (handlerList.size() == 1) {
618       return handlerList.get(0);
619     }
620     logger.warn("No workflow operation handlers found for operation '{}'", operation.getTemplate());
621     return null;
622   }
623 
624   /**
625    * Executes the workflow.
626    *
627    * @param workflow
628    *          the workflow instance
629    * @throws WorkflowException
630    *           if there is a problem processing the workflow
631    * @throws UnauthorizedException
632    */
633   protected Job runWorkflow(WorkflowInstance workflow) throws WorkflowException, UnauthorizedException {
634     if (INSTANTIATED != workflow.getState()) {
635 
636       // If the workflow is "running", we need to determine if there is an operation being executed or not.
637       // When a workflow has been restarted, this might not be the case and the status might not have been
638       // updated accordingly.
639       if (RUNNING == workflow.getState()) {
640         WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
641         if (currentOperation != null) {
642           if (currentOperation.getId() != null) {
643             try {
644               Job operationJob = serviceRegistry.getJob(currentOperation.getId());
645               if (Job.Status.RUNNING.equals(operationJob.getStatus())) {
646                 logger.debug("Not starting workflow {}, it is already in running state", workflow);
647                 return null;
648               } else {
649                 logger.info("Scheduling next operation of workflow {}", workflow);
650                 operationJob.setStatus(Status.QUEUED);
651                 operationJob.setDispatchable(true);
652                 return serviceRegistry.updateJob(operationJob);
653               }
654             } catch (Exception e) {
655               logger.warn("Error determining status of current workflow operation in {}", workflow, e);
656               return null;
657             }
658           }
659         } else {
660           throw new IllegalStateException("Cannot start a workflow '" + workflow + "' with no current operation");
661         }
662       } else {
663         throw new IllegalStateException("Cannot start a workflow in state '" + workflow.getState() + "'");
664       }
665     }
666 
667     // If this is a new workflow, move to the first operation
668     workflow.setState(RUNNING);
669     update(workflow);
670 
671     WorkflowOperationInstance operation = workflow.getCurrentOperation();
672 
673     if (operation == null) {
674       throw new IllegalStateException("Cannot start a workflow without a current operation");
675     }
676 
677     if (!operation.equals(workflow.getOperations().get(0))) {
678       throw new IllegalStateException("Current operation expected to be first");
679     }
680 
681     try {
682       logger.info("Scheduling workflow {} for execution", workflow.getId());
683       Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
684               Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
685       operation.setId(job.getId());
686       update(workflow);
687       job.setStatus(Status.QUEUED);
688       job.setDispatchable(true);
689       return serviceRegistry.updateJob(job);
690     } catch (ServiceRegistryException e) {
691       throw new WorkflowDatabaseException(e);
692     } catch (NotFoundException e) {
693       // this should be impossible
694       throw new IllegalStateException("Unable to find a job that was just created");
695     }
696 
697   }
698 
699   /**
700    * Executes the workflow's current operation.
701    *
702    * @param workflow
703    *          the workflow
704    * @param properties
705    *          the properties that are passed in on resume
706    * @return the processed workflow operation
707    * @throws WorkflowException
708    *           if there is a problem processing the workflow
709    * @throws UnauthorizedException
710    */
711   protected WorkflowOperationInstance runWorkflowOperation(WorkflowInstance workflow, Map<String, String> properties)
712           throws WorkflowException, UnauthorizedException {
713     WorkflowOperationInstance processingOperation = workflow.getCurrentOperation();
714     if (processingOperation == null)
715       throw new IllegalStateException("Workflow '" + workflow + "' has no operation to run");
716 
717     // Keep the current state for later reference, it might have been changed from the outside
718     WorkflowState initialState = workflow.getState();
719 
720     // Execute the operation handler
721     WorkflowOperationHandler operationHandler = selectOperationHandler(processingOperation);
722     WorkflowOperationWorker worker = new WorkflowOperationWorker(operationHandler, workflow, properties, this);
723     workflow = worker.execute();
724 
725     Long currentOperationJobId = processingOperation.getId();
726     try {
727       updateOperationJob(currentOperationJobId, processingOperation.getState());
728     } catch (NotFoundException e) {
729       throw new IllegalStateException("Unable to find a job that has already been running");
730     } catch (ServiceRegistryException e) {
731       throw new WorkflowDatabaseException(e);
732     }
733 
734     // Move on to the next workflow operation
735     WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
736 
737     // Is the workflow done?
738     if (currentOperation == null) {
739 
740       // If we are in failing mode, we were simply working off an error handling workflow
741       if (FAILING.equals(workflow.getState())) {
742         workflow.setState(FAILED);
743       }
744 
745       // Otherwise, let's make sure we didn't miss any failed operation, since the workflow state could have been
746       // switched to paused while processing the error handling workflow extension
747       else if (!FAILED.equals(workflow.getState())) {
748         workflow.setState(SUCCEEDED);
749         for (WorkflowOperationInstance op : workflow.getOperations()) {
750           if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
751             if (op.isFailOnError()) {
752               workflow.setState(FAILED);
753               break;
754             }
755           }
756         }
757       }
758 
759       // Save the updated workflow to the database
760       logger.debug("{} has {}", workflow, workflow.getState());
761       update(workflow);
762 
763     } else {
764 
765       // Somebody might have set the workflow to "paused" from the outside, so take a look a the database first
766       WorkflowState dbWorkflowState;
767       try {
768         dbWorkflowState = getWorkflowById(workflow.getId()).getState();
769       } catch (NotFoundException e) {
770         throw new IllegalStateException("The workflow with ID " + workflow.getId()
771                 + " can not be found in the database", e);
772       } catch (UnauthorizedException e) {
773         throw new IllegalStateException("The workflow with ID " + workflow.getId() + " can not be read", e);
774       }
775 
776       // If somebody changed the workflow state from the outside, that state should take precedence
777       if (!dbWorkflowState.equals(initialState)) {
778         logger.info("Workflow state for {} was changed to '{}' from the outside", workflow, dbWorkflowState);
779         workflow.setState(dbWorkflowState);
780       }
781 
782       // Save the updated workflow to the database
783 
784       Job job;
785       switch (workflow.getState()) {
786         case FAILED:
787           update(workflow);
788           break;
789         case FAILING:
790         case RUNNING:
791           try {
792             job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
793                     Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
794             currentOperation.setId(job.getId());
795             update(workflow);
796             job.setStatus(Status.QUEUED);
797             job.setDispatchable(true);
798             serviceRegistry.updateJob(job);
799           } catch (ServiceRegistryException e) {
800             throw new WorkflowDatabaseException(e);
801           } catch (NotFoundException e) {
802             // this should be impossible
803             throw new IllegalStateException("Unable to find a job that was just created");
804           }
805           break;
806         case PAUSED:
807         case STOPPED:
808         case SUCCEEDED:
809           update(workflow);
810           break;
811         case INSTANTIATED:
812           update(workflow);
813           throw new IllegalStateException("Impossible workflow state found during processing");
814         default:
815           throw new IllegalStateException("Unknown workflow state found during processing");
816       }
817 
818     }
819 
820     return processingOperation;
821   }
822 
823   /**
824    * {@inheritDoc}
825    *
826    * @see org.opencastproject.workflow.api.WorkflowService#getWorkflowDefinitionById(String)
827    */
828   @Override
829   public WorkflowDefinition getWorkflowDefinitionById(String id) throws NotFoundException {
830     final WorkflowIdentifier workflowIdentifier = new WorkflowIdentifier(id, securityService.getOrganization().getId());
831     final WorkflowDefinition def = workflowDefinitionScanner
832             .getWorkflowDefinition(securityService.getUser(), workflowIdentifier);
833     if (def == null) {
834       throw new NotFoundException("Workflow definition '" + workflowIdentifier + "' not found or inaccessible");
835     }
836     return def;
837   }
838 
839   /**
840    * {@inheritDoc}
841    *
842    * @see org.opencastproject.workflow.api.WorkflowService#stop(long)
843    */
844   @Override
845   public WorkflowInstance stop(long workflowInstanceId) throws WorkflowException, NotFoundException,
846           UnauthorizedException {
847     final Lock lock = this.lock.get(workflowInstanceId);
848     lock.lock();
849     try {
850       WorkflowInstance instance = getWorkflowById(workflowInstanceId);
851 
852       if (instance.getState() != STOPPED) {
853         // Update the workflow instance
854         instance.setState(STOPPED);
855         update(instance);
856       }
857 
858       try {
859         removeTempFiles(instance);
860       } catch (Exception e) {
861         logger.warn("Cannot remove temp files for workflow instance {}", workflowInstanceId, e);
862       }
863 
864       return instance;
865     } finally {
866       lock.unlock();
867     }
868   }
869 
870   /**
871    * Checks whether user is set and is known to the userDirectoryService
872    */
873   private void validUserOrThrow(User user) {
874       if (user == null)
875         throw new SecurityException("Current user is unknown");
876 
877       if (userDirectoryService.loadUser(user.getUsername()) == null)
878         throw new SecurityException(String.format("Current user '%s' can not be loaded", user.getUsername()));
879   }
880 
881   private void removeTempFiles(WorkflowInstance workflowInstance) {
882     logger.info("Removing temporary files for workflow {}", workflowInstance.getId());
883     MediaPackage mp = workflowInstance.getMediaPackage();
884     if (null == mp) {
885       logger.warn("Workflow instance {} does not have an media package set", workflowInstance.getId());
886       return;
887     }
888     for (MediaPackageElement elem : mp.getElements()) {
889       // Publications should not link to temporary files and can be skipped
890       if (elem instanceof Publication) {
891         continue;
892       }
893       if (null == elem.getURI()) {
894         logger.warn("Media package element {} from the media package {} does not have an URI set",
895                 elem.getIdentifier(), mp.getIdentifier());
896         continue;
897       }
898       try {
899         logger.debug("Removing temporary file {} for workflow {}", elem.getURI(), workflowInstance);
900         workspace.delete(elem.getURI());
901       } catch (IOException e) {
902         logger.warn("Unable to delete mediapackage element", e);
903       } catch (NotFoundException e) {
904         // File was probably already deleted before...
905       }
906     }
907   }
908 
909 
910   /**
911    * {@inheritDoc}
912    *
913    * @see org.opencastproject.workflow.api.WorkflowService#remove(long)
914    */
915   @Override
916   public void remove(long workflowInstanceId) throws WorkflowDatabaseException, NotFoundException,
917           UnauthorizedException, WorkflowParsingException, WorkflowStateException {
918     remove(workflowInstanceId, false);
919   }
920 
921   /**
922    * {@inheritDoc}
923    *
924    * @see org.opencastproject.workflow.api.WorkflowService#remove(long,boolean)
925    */
926   @Override
927   public void remove(long workflowInstanceId, boolean force) throws WorkflowDatabaseException, NotFoundException,
928           UnauthorizedException, WorkflowStateException {
929     final Lock lock = this.lock.get(workflowInstanceId);
930     lock.lock();
931     try {
932       WorkflowInstance instance = getWorkflowById(workflowInstanceId);
933       WorkflowInstance.WorkflowState state = instance.getState();
934       if (!state.isTerminated() && !force) {
935         throw new WorkflowStateException("Workflow instance with state '" + state + "' cannot be removed "
936             + "since it is not yet terminated.");
937       }
938 
939       assertPermission(instance, Permissions.Action.WRITE.toString(), instance.getOrganizationId());
940 
941       // First, remove temporary files
942       removeTempFiles(instance);
943 
944       // Second, remove jobs related to operations which belong to the workflow instance
945       List<WorkflowOperationInstance> operations = instance.getOperations();
946       List<Long> jobsToDelete = new ArrayList<>();
947       for (WorkflowOperationInstance op : operations) {
948         if (op.getId() != null) {
949           long workflowOpId = op.getId();
950           if (workflowOpId != workflowInstanceId) {
951             jobsToDelete.add(workflowOpId);
952           }
953         }
954       }
955       try {
956         serviceRegistry.removeJobs(jobsToDelete);
957       } catch (ServiceRegistryException e) {
958         logger.warn("Problems while removing jobs related to workflow operations '{}'", jobsToDelete, e);
959       } catch (NotFoundException e) {
960         logger.debug("No jobs related to one of the workflow operation '{}' found in service registry", jobsToDelete);
961       }
962 
963       // Third, remove workflow instance job itself
964       try {
965         serviceRegistry.removeJobs(Collections.singletonList(workflowInstanceId));
966         removeWorkflowInstanceFromIndex(instance.getId());
967       } catch (ServiceRegistryException e) {
968         logger.warn("Problems while removing workflow instance job '{}'", workflowInstanceId, e);
969       } catch (NotFoundException e) {
970         logger.info("No workflow instance job '{}' found in the service registry", workflowInstanceId);
971       }
972 
973       // Remove workflow from database
974       persistence.removeFromDatabase(instance);
975     } finally {
976       lock.unlock();
977     }
978   }
979 
980   /**
981    * {@inheritDoc}
982    *
983    * @see org.opencastproject.workflow.api.WorkflowService#suspend(long)
984    */
985   @Override
986   public WorkflowInstance suspend(long workflowInstanceId) throws WorkflowException, NotFoundException,
987           UnauthorizedException {
988     final Lock lock = this.lock.get(workflowInstanceId);
989     lock.lock();
990     try {
991       WorkflowInstance instance = getWorkflowById(workflowInstanceId);
992       instance.setState(PAUSED);
993       update(instance);
994       return instance;
995     } finally {
996       lock.unlock();
997     }
998   }
999 
1000   /**
1001    * {@inheritDoc}
1002    *
1003    * @see org.opencastproject.workflow.api.WorkflowService#resume(long)
1004    */
1005   @Override
1006   public WorkflowInstance resume(long id) throws WorkflowException, NotFoundException, IllegalStateException,
1007           UnauthorizedException {
1008     return resume(id, null);
1009   }
1010 
1011   /**
1012    * {@inheritDoc}
1013    *
1014    * @see org.opencastproject.workflow.api.WorkflowService#resume(long, Map)
1015    */
1016   @Override
1017   public WorkflowInstance resume(long workflowInstanceId, Map<String, String> properties) throws WorkflowException,
1018           NotFoundException, IllegalStateException, UnauthorizedException {
1019     WorkflowInstance workflowInstance = getWorkflowById(workflowInstanceId);
1020     if (!WorkflowState.PAUSED.equals(workflowInstance.getState()))
1021       throw new IllegalStateException("Can not resume a workflow where the current state is not in paused");
1022 
1023     workflowInstance = updateConfiguration(workflowInstance, properties);
1024     update(workflowInstance);
1025 
1026     WorkflowOperationInstance currentOperation = workflowInstance.getCurrentOperation();
1027 
1028     // Is the workflow done?
1029     if (currentOperation == null) {
1030       // Let's make sure we didn't miss any failed operation, since the workflow state could have been
1031       // switched to paused while processing the error handling workflow extension
1032       workflowInstance.setState(SUCCEEDED);
1033       for (WorkflowOperationInstance op : workflowInstance.getOperations()) {
1034         if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
1035           if (op.isFailOnError()) {
1036             workflowInstance.setState(FAILED);
1037             break;
1038           }
1039         }
1040       }
1041 
1042       // Save the resumed workflow to the database
1043       logger.debug("{} has {}", workflowInstance, workflowInstance.getState());
1044       update(workflowInstance);
1045       return workflowInstance;
1046     }
1047 
1048     // We can resume workflows when they are in either the paused state, or they are being advanced manually passed
1049     // certain operations. In the latter case, there is no current paused operation.
1050     if (OperationState.INSTANTIATED.equals(currentOperation.getState())) {
1051       try {
1052         // the operation has its own job. Update that too.
1053         Job operationJob = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
1054                 Collections.singletonList(Long.toString(workflowInstanceId)), null, false, null, WORKFLOW_JOB_LOAD);
1055 
1056         // this method call is publicly visible, so it doesn't necessarily go through the accept method. Set the
1057         // workflow state manually.
1058         workflowInstance.setState(RUNNING);
1059         currentOperation.setId(operationJob.getId());
1060 
1061         // update the workflow and its associated job
1062         update(workflowInstance);
1063 
1064         // Now set this job to be queued so it can be dispatched
1065         operationJob.setStatus(Status.QUEUED);
1066         operationJob.setDispatchable(true);
1067         serviceRegistry.updateJob(operationJob);
1068 
1069         return workflowInstance;
1070       } catch (ServiceRegistryException e) {
1071         throw new WorkflowDatabaseException(e);
1072       }
1073     }
1074 
1075     Long operationJobId = workflowInstance.getCurrentOperation().getId();
1076     if (operationJobId == null)
1077       throw new IllegalStateException("Can not resume a workflow where the current operation has no associated id");
1078 
1079     // Set the current operation's job to queued, so it gets picked up again
1080     Job workflowJob;
1081     try {
1082       workflowJob = serviceRegistry.getJob(workflowInstanceId);
1083       workflowJob.setStatus(Status.RUNNING);
1084       persistence.updateInDatabase(workflowInstance);
1085       serviceRegistry.updateJob(workflowJob);
1086 
1087       Job operationJob = serviceRegistry.getJob(operationJobId);
1088       operationJob.setStatus(Status.QUEUED);
1089       operationJob.setDispatchable(true);
1090       if (properties != null) {
1091         Properties props = new Properties();
1092         props.putAll(properties);
1093         ByteArrayOutputStream out = new ByteArrayOutputStream();
1094         props.store(out, null);
1095         List<String> newArguments = new ArrayList<String>(operationJob.getArguments());
1096         newArguments.add(new String(out.toByteArray(), StandardCharsets.UTF_8));
1097         operationJob.setArguments(newArguments);
1098       }
1099       serviceRegistry.updateJob(operationJob);
1100     } catch (ServiceRegistryException e) {
1101       throw new WorkflowDatabaseException(e);
1102     } catch (IOException e) {
1103       throw new WorkflowParsingException("Unable to parse workflow and/or workflow properties");
1104     }
1105 
1106     return workflowInstance;
1107   }
1108 
1109   /**
1110    * Asserts that the current user has permission to take the provided action on a workflow instance.
1111    *
1112    * @param workflow
1113    *          the workflow instance
1114    * @param action
1115    *          the action to ensure is permitted
1116    * @throws UnauthorizedException
1117    *           if the action is not authorized
1118    */
1119   protected void assertPermission(WorkflowInstance workflow, String action, String workflowOrgId) throws UnauthorizedException {
1120     User currentUser = securityService.getUser();
1121     Organization currentOrg = securityService.getOrganization();
1122     String currentOrgAdminRole = currentOrg.getAdminRole();
1123     String currentOrgId = currentOrg.getId();
1124 
1125     MediaPackage mediapackage = workflow.getMediaPackage();
1126 
1127     WorkflowState state = workflow.getState();
1128     if (state != INSTANTIATED && state != RUNNING && workflow.getState() != FAILING) {
1129       Optional<MediaPackage> assetMediapackage = assetManager.getMediaPackage(mediapackage.getIdentifier().toString());
1130       if (assetMediapackage.isPresent()) {
1131         mediapackage = assetMediapackage.get();
1132       }
1133     }
1134 
1135     var creatorName = workflow.getCreatorName();
1136     var workflowCreator = creatorName == null ? null : userDirectoryService.loadUser(creatorName);
1137     boolean authorized = currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1138             || (currentUser.hasRole(currentOrgAdminRole) && currentOrgId.equals(workflowOrgId))
1139             || (currentUser.equals(workflowCreator))
1140             || (authorizationService.hasPermission(mediapackage, action) && currentOrgId.equals(workflowOrgId));
1141 
1142     if (!authorized) {
1143       throw new UnauthorizedException(currentUser, action);
1144     }
1145   }
1146 
1147   protected boolean assertMediaPackagePermission(String mediaPackageId, String action) {
1148     var currentUser = securityService.getUser();
1149     Optional<MediaPackage> mp = assetManager.getMediaPackage(mediaPackageId);
1150 
1151     // asset manager already checks if user is admin, org admin for same org as mp, or has explicit read rights
1152     // global admins can still get workflow instances if mp is gone from asset manager
1153     // org admins can't because then we don't know if mp belonged to same org as user
1154     return currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1155             || mp.isPresent() && currentUser.hasRole(securityService.getOrganization().getAdminRole())
1156             || mp.isPresent() && authorizationService.hasPermission(mp.get(), action);
1157   }
1158 
1159   /**
1160    * {@inheritDoc}
1161    *
1162    * @see org.opencastproject.workflow.api.WorkflowService#update(org.opencastproject.workflow.api.WorkflowInstance)
1163    */
1164   @Override
1165   public void update(final WorkflowInstance workflowInstance) throws WorkflowDatabaseException, UnauthorizedException {
1166     final Lock lock = updateLock.get(workflowInstance.getId());
1167     lock.lock();
1168 
1169     try {
1170       WorkflowInstance originalWorkflowInstance = null;
1171       try {
1172         // get workflow and assert permissions
1173         originalWorkflowInstance = getWorkflowById(workflowInstance.getId());
1174       } catch (NotFoundException e) {
1175         // That's fine, it's a new workflow instance
1176       }
1177 
1178       MediaPackage updatedMediaPackage = null;
1179       try {
1180 
1181         // Before we persist this, extract the metadata
1182         updatedMediaPackage = workflowInstance.getMediaPackage();
1183 
1184         populateMediaPackageMetadata(updatedMediaPackage);
1185 
1186         String seriesId = updatedMediaPackage.getSeries();
1187         if (seriesId != null && workflowInstance.getCurrentOperation() != null) {
1188           // If the mediapackage contains a series, find the series ACLs and add the security information to the
1189           // mediapackage
1190 
1191           try {
1192             AccessControlList acl = seriesService.getSeriesAccessControl(seriesId);
1193             Tuple<AccessControlList, AclScope> activeAcl = authorizationService.getAcl(
1194                 updatedMediaPackage, AclScope.Series);
1195             // Update series ACL if it differs from the active series ACL on the media package
1196             if (!AclScope.Series.equals(activeAcl.getB()) || !AccessControlUtil.equals(activeAcl.getA(), acl)) {
1197               authorizationService.setAcl(updatedMediaPackage, AclScope.Series, acl);
1198             }
1199           } catch (NotFoundException e) {
1200             logger.debug("Not updating series ACL on event {} since series {} has no ACL set",
1201                 updatedMediaPackage, seriesId, e);
1202           }
1203         }
1204 
1205         workflowInstance.setMediaPackage(updatedMediaPackage);
1206       } catch (SeriesException e) {
1207         throw new WorkflowDatabaseException(e);
1208       } catch (Exception e) {
1209         logger.error("Metadata for media package {} could not be updated", updatedMediaPackage, e);
1210       }
1211 
1212       // Synchronize the job status with the workflow
1213       WorkflowState workflowState = workflowInstance.getState();
1214 
1215       Job job;
1216       try {
1217         job = serviceRegistry.getJob(workflowInstance.getId());
1218         job.setPayload(Long.toString(workflowInstance.getId()));
1219 
1220         // Synchronize workflow and job state
1221         switch (workflowState) {
1222           case FAILED:
1223             job.setStatus(Status.FAILED);
1224             break;
1225           case FAILING:
1226             break;
1227           case INSTANTIATED:
1228             job.setDispatchable(true);
1229             job.setStatus(Status.QUEUED);
1230             break;
1231           case PAUSED:
1232             job.setStatus(Status.PAUSED);
1233             break;
1234           case RUNNING:
1235             job.setStatus(Status.RUNNING);
1236             break;
1237           case STOPPED:
1238             job.setStatus(Status.CANCELLED);
1239             break;
1240           case SUCCEEDED:
1241             job.setStatus(Status.FINISHED);
1242             break;
1243           default:
1244             throw new IllegalStateException("Found a workflow state that is not handled");
1245         }
1246       } catch (ServiceRegistryException e) {
1247         throw new WorkflowDatabaseException(
1248             "Unable to read workflow job " + workflowInstance.getId() + " from service registry", e);
1249       } catch (NotFoundException e) {
1250         throw new WorkflowDatabaseException(
1251             "Job for workflow " + workflowInstance.getId() + " not found in service registry", e);
1252       }
1253 
1254       // Update both workflow and workflow job
1255       try {
1256         //Update the database
1257         persistence.updateInDatabase(workflowInstance);
1258 
1259         job = serviceRegistry.updateJob(job);
1260 
1261         WorkflowOperationInstance op = workflowInstance.getCurrentOperation();
1262 
1263         // Update index used for UI. Note that we only need certain metadata and we can safely filter out workflow
1264         // updates for running operations since we updated the metadata right before these operations and will do so
1265         // again right after those operations.
1266         if (op == null || op.getState() != OperationState.RUNNING) {
1267           // Collect necessary information only for index update
1268           long id = workflowInstance.getId();
1269           int state = workflowInstance.getState().ordinal();
1270           var template = workflowInstance.getTemplate();
1271           String mpId = workflowInstance.getMediaPackage().getIdentifier().toString();
1272           String orgId = workflowInstance.getOrganizationId();
1273 
1274           updateWorkflowInstanceInIndex(id, state, template, mpId, orgId);
1275         }
1276       } catch (ServiceRegistryException e) {
1277         throw new WorkflowDatabaseException("Update of workflow job " + workflowInstance.getId()
1278             + " in the service registry failed, service registry and workflow table may be out of sync", e);
1279       } catch (NotFoundException e) {
1280         throw new WorkflowDatabaseException("Job for workflow " + workflowInstance.getId()
1281             + " not found in service registry", e);
1282       } catch (Exception e) {
1283         throw new WorkflowDatabaseException("Update of workflow job " + job.getId() + " in the service registry failed, "
1284             + "service registry and workflow table may be out of sync", e);
1285       }
1286 
1287       try {
1288         fireListeners(originalWorkflowInstance, workflowInstance);
1289       } catch (Exception e) {
1290         // Can't happen, since we are converting from an in-memory object
1291         throw new IllegalStateException("In-memory workflow instance could not be serialized", e);
1292       }
1293     } finally {
1294       lock.unlock();
1295     }
1296   }
1297 
1298   /**
1299    * {@inheritDoc}
1300    *
1301    * @see org.opencastproject.workflow.api.WorkflowService#countWorkflowInstances()
1302    */
1303   @Override
1304   public long countWorkflowInstances() throws WorkflowDatabaseException {
1305     return countWorkflowInstances(null);
1306   }
1307 
1308   /**
1309    * {@inheritDoc}
1310    *
1311    * @see org.opencastproject.workflow.api.WorkflowService#countWorkflowInstances(org.opencastproject.workflow.api.WorkflowInstance.WorkflowState)
1312    */
1313   @Override
1314   public long countWorkflowInstances(WorkflowState state) throws WorkflowDatabaseException {
1315     return persistence.countWorkflows(state);
1316   }
1317 
1318   /**
1319    * {@inheritDoc}
1320    *
1321    * @see org.opencastproject.workflow.api.WorkflowService#getWorkflowInstancesByMediaPackage(String)
1322    */
1323   @Override
1324   public List<WorkflowInstance> getWorkflowInstancesByMediaPackage(String mediaPackageId)
1325       throws WorkflowDatabaseException, UnauthorizedException {
1326     // If we have read permission to the media package, return all workflows
1327     if (!assertMediaPackagePermission(mediaPackageId, Permissions.Action.READ.toString())) {
1328       throw new UnauthorizedException("Not allowed to access event");
1329     }
1330     return persistence.getWorkflowInstancesByMediaPackage(mediaPackageId);
1331   }
1332 
1333   /**
1334    * {@inheritDoc}
1335    *
1336    * @see org.opencastproject.workflow.api.WorkflowService#getRunningWorkflowInstanceByMediaPackage(String, String)
1337    */
1338   @Override
1339   public Optional<WorkflowInstance> getRunningWorkflowInstanceByMediaPackage(String mediaPackageId, String action)
1340           throws WorkflowException, UnauthorizedException, WorkflowDatabaseException {
1341     List<WorkflowInstance> workflowInstances = persistence.getRunningWorkflowInstancesByMediaPackage(mediaPackageId);
1342 
1343     // If there is more than workflow running something is very wrong
1344     if (workflowInstances.size() > 1) {
1345       throw new WorkflowException("Multiple workflows are active on mediapackage " + mediaPackageId);
1346     }
1347 
1348     Optional<WorkflowInstance> optWorkflowInstance = Optional.empty();
1349     if (workflowInstances.size() == 1) {
1350       WorkflowInstance wfInstance = workflowInstances.get(0);
1351       optWorkflowInstance = Optional.of(wfInstance);
1352       assertPermission(wfInstance, action, wfInstance.getOrganizationId());
1353     }
1354 
1355     return optWorkflowInstance;
1356   }
1357 
1358   /**
1359    * {@inheritDoc}
1360    *
1361    * @see org.opencastproject.workflow.api.WorkflowService#mediaPackageHasActiveWorkflows(String)
1362    */
1363   @Override
1364   public boolean mediaPackageHasActiveWorkflows(String mediaPackageId) throws WorkflowDatabaseException {
1365     return persistence.mediaPackageHasActiveWorkflows(mediaPackageId);
1366   }
1367 
1368   /**
1369    * {@inheritDoc}
1370    *
1371    * @see org.opencastproject.workflow.api.WorkflowService#userHasActiveWorkflows(String)
1372    */
1373   @Override
1374   public boolean userHasActiveWorkflows(String userId) throws WorkflowDatabaseException {
1375     return persistence.userHasActiveWorkflows(userId);
1376   }
1377 
1378   /**
1379    * Callback for workflow operations that were throwing an exception. This implementation assumes that the operation
1380    * worker has already adjusted the current operation's state appropriately.
1381    *
1382    * @param workflow
1383    *          the workflow instance
1384    * @param currentOperation
1385    *          the current workflow operation
1386    * @return the workflow instance
1387    */
1388   protected WorkflowInstance handleOperationException(
1389       WorkflowInstance workflow,
1390       WorkflowOperationInstance currentOperation) {
1391     int failedAttempt = currentOperation.getFailedAttempts() + 1;
1392     currentOperation.setFailedAttempts(failedAttempt);
1393 
1394     // Operation was aborted by the user, after going into hold state
1395     if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate())
1396             && OperationState.FAILED.equals(currentOperation.getState())) {
1397       int position = workflow.getOperations().indexOf(currentOperation);
1398       // Advance to operation that actually failed
1399       if (workflow.getOperations().size() > position + 1) { // This should always be true...
1400         currentOperation = workflow.getOperations().get(position + 1);
1401         // It's currently in RETRY state, change to FAILED
1402         currentOperation.setState(OperationState.FAILED);
1403       }
1404       handleFailedOperation(workflow, currentOperation);
1405     } else if (currentOperation.getMaxAttempts() != -1 && failedAttempt == currentOperation.getMaxAttempts()) {
1406       handleFailedOperation(workflow, currentOperation);
1407     } else {
1408       switch (currentOperation.getRetryStrategy()) {
1409         case NONE:
1410           handleFailedOperation(workflow, currentOperation);
1411           break;
1412         case RETRY:
1413           currentOperation.setState(OperationState.RETRY);
1414           break;
1415         case HOLD:
1416           currentOperation.setState(OperationState.RETRY);
1417           List<WorkflowOperationInstance> operations = workflow.getOperations();
1418           WorkflowOperationDefinitionImpl errorResolutionDefinition = new WorkflowOperationDefinitionImpl(
1419                   ERROR_RESOLUTION_HANDLER_ID, "Error Resolution Operation", "error", false);
1420           var errorResolutionInstance = new WorkflowOperationInstance(errorResolutionDefinition);
1421           errorResolutionInstance.setExceptionHandlingWorkflow(currentOperation.getExceptionHandlingWorkflow());
1422           var index = workflow.getOperations().indexOf(currentOperation);
1423           operations.add(index, errorResolutionInstance);
1424           workflow.setOperations(operations);
1425           break;
1426         default:
1427           break;
1428       }
1429     }
1430     return workflow;
1431   }
1432 
1433   /**
1434    * Handles the workflow for a failing operation.
1435    *
1436    * @param workflow
1437    *          the workflow
1438    * @param currentOperation
1439    *          the failing workflow operation instance
1440    */
1441   private void handleFailedOperation(WorkflowInstance workflow, WorkflowOperationInstance currentOperation) {
1442     String errorDefId = currentOperation.getExceptionHandlingWorkflow();
1443 
1444     // Adjust the workflow state according to the setting on the operation
1445     if (currentOperation.isFailOnError()) {
1446       if (StringUtils.isBlank(errorDefId)) {
1447         workflow.setState(FAILED);
1448       } else {
1449         workflow.setState(FAILING);
1450 
1451         // Remove the rest of the original workflow
1452         int currentOperationPosition = workflow.getOperations().indexOf(currentOperation);
1453         List<WorkflowOperationInstance> operations = new ArrayList<>(
1454                 workflow.getOperations().subList(0, currentOperationPosition + 1));
1455         workflow.setOperations(operations);
1456 
1457         // Determine the current workflow configuration
1458         Map<String, String> configuration = new HashMap<>();
1459         for (String configKey : workflow.getConfigurationKeys()) {
1460           configuration.put(configKey, workflow.getConfiguration(configKey));
1461         }
1462 
1463         // Append the operations
1464         WorkflowDefinition errorDef = null;
1465         try {
1466           errorDef = getWorkflowDefinitionById(errorDefId);
1467           workflow.extend(errorDef);
1468           workflow.setOperations(updateConfiguration(workflow, configuration).getOperations());
1469         } catch (NotFoundException notFoundException) {
1470           throw new IllegalStateException("Unable to find the error workflow definition '" + errorDefId + "'");
1471         }
1472       }
1473     }
1474 
1475     // Fail the current operation
1476     currentOperation.setState(OperationState.FAILED);
1477   }
1478 
1479   /**
1480    * Callback for workflow operation handlers that executed and finished without exception. This implementation assumes
1481    * that the operation worker has already adjusted the current operation's state appropriately.
1482    *
1483    * @param workflow
1484    *          the workflow instance
1485    * @param result
1486    *          the workflow operation result
1487    * @return the workflow instance
1488    * @throws WorkflowDatabaseException
1489    *           if updating the workflow fails
1490    */
1491   protected WorkflowInstance handleOperationResult(WorkflowInstance workflow, WorkflowOperationResult result)
1492           throws WorkflowDatabaseException {
1493 
1494     // Get the operation and its handler
1495     WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
1496     WorkflowOperationHandler handler = getWorkflowOperationHandler(currentOperation.getTemplate());
1497 
1498     // Create an operation result for the lazy or else update the workflow's media package
1499     if (result == null) {
1500       logger.warn("Handling a null operation result for workflow {} in operation {}", workflow.getId(),
1501               currentOperation.getTemplate());
1502       result = new WorkflowOperationResultImpl(workflow.getMediaPackage(), null, Action.CONTINUE, 0);
1503     } else {
1504       MediaPackage mp = result.getMediaPackage();
1505       if (mp != null) {
1506         workflow.setMediaPackage(mp);
1507       }
1508     }
1509 
1510     // The action to take
1511     Action action = result.getAction();
1512 
1513     // Update the workflow configuration.
1514     workflow = updateConfiguration(workflow, result.getProperties());
1515 
1516     // Adjust workflow statistics
1517     currentOperation.setTimeInQueue(result.getTimeInQueue());
1518 
1519     // Adjust the operation state
1520     switch (action) {
1521       case CONTINUE:
1522         currentOperation.setState(OperationState.SUCCEEDED);
1523         break;
1524       case PAUSE:
1525         if (!(handler instanceof ResumableWorkflowOperationHandler)) {
1526           throw new IllegalStateException("Operation " + currentOperation.getTemplate() + " is not resumable");
1527         }
1528 
1529         // Set abortable and continuable to default values
1530         currentOperation.setContinuable(result.allowsContinue());
1531         currentOperation.setAbortable(result.allowsAbort());
1532 
1533         workflow.setState(PAUSED);
1534         currentOperation.setState(OperationState.PAUSED);
1535         break;
1536       case SKIP:
1537         currentOperation.setState(OperationState.SKIPPED);
1538         break;
1539       default:
1540         throw new IllegalStateException("Unknown action '" + action + "' returned");
1541     }
1542 
1543     if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate()) && result.getAction() == Action.CONTINUE) {
1544 
1545       Map<String, String> resultProperties = result.getProperties();
1546       if (resultProperties == null || StringUtils.isBlank(resultProperties.get(RETRY_STRATEGY)))
1547         throw new WorkflowDatabaseException("Retry strategy not present in properties!");
1548 
1549       RetryStrategy retryStrategy = RetryStrategy.valueOf(resultProperties.get(RETRY_STRATEGY));
1550       switch (retryStrategy) {
1551         case NONE:
1552           handleFailedOperation(workflow, workflow.getCurrentOperation());
1553           break;
1554         case RETRY:
1555           break;
1556         default:
1557           throw new WorkflowDatabaseException("Retry strategy not implemented yet!");
1558       }
1559     }
1560 
1561     return workflow;
1562   }
1563 
1564   /**
1565    * Reads the available metadata from the dublin core catalog (if there is one) and updates the mediapackage.
1566    *
1567    * @param mp
1568    *          the media package
1569    */
1570   protected void populateMediaPackageMetadata(MediaPackage mp) {
1571     if (metadataServices.isEmpty()) {
1572       logger.warn("No metadata services are registered, so no media package metadata can be extracted from catalogs");
1573       return;
1574     }
1575     for (MediaPackageMetadataService metadataService : metadataServices) {
1576       MediaPackageMetadata metadata = metadataService.getMetadata(mp);
1577       MediaPackageMetadataSupport.populateMediaPackageMetadata(mp, metadata);
1578     }
1579   }
1580 
1581   /**
1582    * {@inheritDoc}
1583    *
1584    * @see org.opencastproject.job.api.JobProducer#isReadyToAcceptJobs(String)
1585    */
1586   @Override
1587   public boolean isReadyToAcceptJobs(String operation) {
1588     return true;
1589   }
1590 
1591   /**
1592    * {@inheritDoc}
1593    *
1594    * If we are already running the maximum number of workflows, don't accept another START_WORKFLOW job
1595    *
1596    * @see org.opencastproject.job.api.AbstractJobProducer#isReadyToAccept(org.opencastproject.job.api.Job)
1597    */
1598   @Override
1599   public boolean isReadyToAccept(Job job) throws UndispatchableJobException {
1600     String operation = job.getOperation();
1601 
1602     // Only restrict execution of new jobs
1603     if (!Operation.START_WORKFLOW.toString().equals(operation))
1604       return true;
1605 
1606     // If the first operation is guaranteed to pause, run the job.
1607     if (job.getArguments().size() > 1 && job.getArguments().get(0) != null) {
1608       try {
1609         WorkflowDefinition workflowDef = XmlWorkflowParser.parseWorkflowDefinition(job.getArguments().get(0));
1610         if (workflowDef.getOperations().size() > 0) {
1611           String firstOperationId = workflowDef.getOperations().get(0).getId();
1612           WorkflowOperationHandler handler = getWorkflowOperationHandler(firstOperationId);
1613           if (handler instanceof ResumableWorkflowOperationHandler) {
1614             if (((ResumableWorkflowOperationHandler) handler).isAlwaysPause()) {
1615               return true;
1616             }
1617           }
1618         }
1619       } catch (WorkflowParsingException e) {
1620         throw new UndispatchableJobException(job + " is not a proper job to start a workflow", e);
1621       }
1622     }
1623 
1624     WorkflowInstance workflow;
1625     Optional<WorkflowInstance> workflowInstance;
1626     String mediaPackageId;
1627 
1628     // Fetch all workflows that are running with the current media package
1629     try {
1630       workflow = getWorkflowById(job.getId());
1631       mediaPackageId = workflow.getMediaPackage().getIdentifier().toString();
1632     } catch (NotFoundException e) {
1633       throw new UndispatchableJobException("Trying to start workflow with job id " + job.getId()
1634           + " but no corresponding instance is available from the workflow service", e);
1635     } catch (UnauthorizedException e) {
1636       throw new UndispatchableJobException(
1637           "Authorization denied while requesting to loading workflow instance. Job: " + job.getId(), e);
1638     }
1639 
1640     try {
1641       workflowInstance = getRunningWorkflowInstanceByMediaPackage(
1642               workflow.getMediaPackage().getIdentifier().toString(), Permissions.Action.READ.toString());
1643     } catch (UnauthorizedException e) {
1644       throw new UndispatchableJobException("Authorization denied while requesting to loading workflow instance " + workflow.getId(), e);
1645     } catch (WorkflowDatabaseException e) {
1646       throw new UndispatchableJobException("An database error occurred while checking if a workflow is already active "
1647           + "(job: " + job.getId() + ")", e);
1648     } catch (WorkflowException e) {
1649       // Avoid running multiple workflows with same media package id at the same time
1650       delayWorkflow(workflow, mediaPackageId);
1651       return false;
1652     }
1653 
1654     // Make sure we are not excluding ourselves
1655     if (workflowInstance.isPresent() && workflow.getId() != workflowInstance.get().getId()) {
1656       delayWorkflow(workflow, mediaPackageId);
1657       return false;
1658     }
1659 
1660     return true;
1661   }
1662 
1663   private void delayWorkflow(WorkflowInstance workflow, String mediaPackageId) {
1664     if (!delayedWorkflows.contains(workflow.getId())) {
1665       logger.info("Delaying start of workflow {}, another workflow on media package {} is still running",
1666               workflow.getId(), mediaPackageId);
1667       delayedWorkflows.add(workflow.getId());
1668     }
1669   }
1670 
1671   /**
1672    * {@inheritDoc}
1673    *
1674    * @see org.opencastproject.job.api.AbstractJobProducer#acceptJob(org.opencastproject.job.api.Job)
1675    */
1676   @Override
1677   public synchronized void acceptJob(Job job) throws ServiceRegistryException {
1678     User originalUser = securityService.getUser();
1679     Organization originalOrg = securityService.getOrganization();
1680     try {
1681       Organization organization = organizationDirectoryService.getOrganization(job.getOrganization());
1682       securityService.setOrganization(organization);
1683       User user = userDirectoryService.loadUser(job.getCreator());
1684       securityService.setUser(user);
1685       job.setStatus(Job.Status.RUNNING);
1686       job = serviceRegistry.updateJob(job);
1687 
1688       // Check if this workflow was initially delayed
1689       if (delayedWorkflows.contains(job.getId())) {
1690         delayedWorkflows.remove(job.getId());
1691         logger.info("Starting initially delayed workflow {}, {} more waiting", job.getId(), delayedWorkflows.size());
1692       }
1693 
1694       executorService.submit(new JobRunner(job, serviceRegistry.getCurrentJob()));
1695     } catch (Exception e) {
1696       if (e instanceof ServiceRegistryException)
1697         throw (ServiceRegistryException) e;
1698       throw new ServiceRegistryException(e);
1699     } finally {
1700       securityService.setUser(originalUser);
1701       securityService.setOrganization(originalOrg);
1702     }
1703   }
1704 
1705   /**
1706    * Processes the workflow job.
1707    *
1708    * @param job
1709    *          the job
1710    * @return the job payload
1711    * @throws Exception
1712    *           if job processing fails
1713    */
1714   protected String process(Job job) throws Exception {
1715     List<String> arguments = job.getArguments();
1716     Operation op = null;
1717     WorkflowInstance workflowInstance = null;
1718     WorkflowOperationInstance wfo;
1719     String operation = job.getOperation();
1720     try {
1721       try {
1722         op = Operation.valueOf(operation);
1723         switch (op) {
1724           case START_WORKFLOW:
1725             workflowInstance = persistence.getWorkflow(Long.parseLong(job.getPayload()));
1726             logger.debug("Starting new workflow {}", workflowInstance);
1727             runWorkflow(workflowInstance);
1728             break;
1729           case RESUME:
1730             workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1731             Map<String, String> properties = null;
1732             if (arguments.size() > 1) {
1733               Properties props = new Properties();
1734               props.load(IOUtils.toInputStream(arguments.get(arguments.size() - 1), StandardCharsets.UTF_8));
1735               properties = new HashMap<>();
1736               for (Entry<Object, Object> entry : props.entrySet()) {
1737                 properties.put(entry.getKey().toString(), entry.getValue().toString());
1738               }
1739             }
1740             logger.debug("Resuming {} at {}", workflowInstance, workflowInstance.getCurrentOperation());
1741             workflowInstance.setState(RUNNING);
1742             update(workflowInstance);
1743             runWorkflowOperation(workflowInstance, properties);
1744             break;
1745           case START_OPERATION:
1746             workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1747             wfo = workflowInstance.getCurrentOperation();
1748 
1749             if (OperationState.RUNNING.equals(wfo.getState()) || OperationState.PAUSED.equals(wfo.getState())) {
1750               logger.info("Reset operation state {} {} to INSTANTIATED due to job restart", workflowInstance, wfo);
1751               wfo.setState(OperationState.INSTANTIATED);
1752             }
1753 
1754             wfo.setExecutionHost(job.getProcessingHost());
1755             logger.debug("Running {} {}", workflowInstance, wfo);
1756             wfo = runWorkflowOperation(workflowInstance, null);
1757             updateOperationJob(job.getId(), wfo.getState());
1758             break;
1759           default:
1760             throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
1761         }
1762       } catch (IllegalArgumentException e) {
1763         throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
1764       } catch (IndexOutOfBoundsException e) {
1765         throw new ServiceRegistryException(
1766             "The argument list for operation '" + op + "' (job: " + job.getId() + ") does not meet expectations", e);
1767       } catch (NotFoundException e) {
1768         logger.warn("Not found processing job {}", job, e);
1769         updateOperationJob(job.getId(), OperationState.FAILED);
1770       }
1771       return null;
1772     } catch (Exception e) {
1773       logger.warn("Exception while accepting job {}", job, e);
1774       try {
1775         if (workflowInstance != null) {
1776           logger.warn("Marking job {} and workflow instance {} as failed", job, workflowInstance);
1777           updateOperationJob(job.getId(), OperationState.FAILED);
1778           workflowInstance.setState(FAILED);
1779           update(workflowInstance);
1780         } else {
1781           logger.warn("Unable to parse workflow instance", e);
1782         }
1783       } catch (WorkflowDatabaseException e1) {
1784         throw new ServiceRegistryException(e1);
1785       }
1786       if (e instanceof ServiceRegistryException)
1787         throw e;
1788       throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
1789     }
1790   }
1791 
1792   /**
1793    * Synchronizes the workflow operation's job with the operation status if the operation has a job associated with it,
1794    * which is determined by looking at the operation's job id.
1795    *
1796    * @param state
1797    *          the operation state
1798    * @param jobId
1799    *          the associated job
1800    * @return the updated job or <code>null</code> if there is no job for this operation
1801    * @throws ServiceRegistryException
1802    *           if the job can't be updated in the service registry
1803    * @throws NotFoundException
1804    *           if the job can't be found
1805    */
1806   private Job updateOperationJob(Long jobId, OperationState state) throws NotFoundException, ServiceRegistryException {
1807     if (jobId == null)
1808       return null;
1809     Job job = serviceRegistry.getJob(jobId);
1810     switch (state) {
1811       case FAILED:
1812       case RETRY:
1813         job.setStatus(Status.FAILED);
1814         break;
1815       case PAUSED:
1816         job.setStatus(Status.PAUSED);
1817         job.setOperation(Operation.RESUME.toString());
1818         break;
1819       case SKIPPED:
1820       case SUCCEEDED:
1821         job.setStatus(Status.FINISHED);
1822         break;
1823       default:
1824         throw new IllegalStateException("Unexpected state '" + state + "' found");
1825     }
1826     return serviceRegistry.updateJob(job);
1827   }
1828 
1829   /**
1830    * {@inheritDoc}
1831    *
1832    * @see org.opencastproject.job.api.JobProducer#countJobs(org.opencastproject.job.api.Job.Status)
1833    */
1834   @Override
1835   public long countJobs(Status status) throws ServiceRegistryException {
1836     return serviceRegistry.count(JOB_TYPE, status);
1837   }
1838 
1839   /**
1840    * Converts a Map<String, String> to s key=value\n string, suitable for the properties form parameter expected by the
1841    * workflow rest endpoint.
1842    *
1843    * @param props
1844    *          The map of strings
1845    * @return the string representation
1846    */
1847   private String mapToString(Map<String, String> props) {
1848     if (props == null)
1849       return null;
1850     StringBuilder sb = new StringBuilder();
1851     for (Entry<String, String> entry : props.entrySet()) {
1852       sb.append(entry.getKey());
1853       sb.append("=");
1854       sb.append(entry.getValue());
1855       sb.append("\n");
1856     }
1857     return sb.toString();
1858   }
1859 
1860   /**
1861    * Dummy callback for osgi
1862    *
1863    * @param unused
1864    *          the unused ReadinessIndicator
1865    */
1866   @Reference(target = "(artifact=workflowdefinition)")
1867   protected void setProfilesReadyIndicator(ReadinessIndicator unused) { }
1868 
1869   /**
1870    * Callback for the OSGi environment to register with the <code>Workspace</code>.
1871    *
1872    * @param workspace
1873    *          the workspace
1874    */
1875   @Reference
1876   protected void setWorkspace(Workspace workspace) {
1877     this.workspace = workspace;
1878   }
1879 
1880   /**
1881    * Callback for the OSGi environment to register with the <code>ServiceRegistry</code>.
1882    *
1883    * @param registry
1884    *          the service registry
1885    */
1886   @Reference
1887   protected void setServiceRegistry(ServiceRegistry registry) {
1888     this.serviceRegistry = registry;
1889   }
1890 
1891   public ServiceRegistry getServiceRegistry() {
1892     return serviceRegistry;
1893   }
1894 
1895   /**
1896    * Callback for setting the security service.
1897    *
1898    * @param securityService
1899    *          the securityService to set
1900    */
1901   @Reference
1902   public void setSecurityService(SecurityService securityService) {
1903     this.securityService = securityService;
1904   }
1905 
1906   /**
1907    * Callback for setting the authorization service.
1908    *
1909    * @param authorizationService
1910    *          the authorizationService to set
1911    */
1912   @Reference
1913   public void setAuthorizationService(AuthorizationService authorizationService) {
1914     this.authorizationService = authorizationService;
1915   }
1916 
1917   /**
1918    * Callback for setting the user directory service
1919    *
1920    * @param userDirectoryService
1921    *          the userDirectoryService to set
1922    */
1923   @Reference
1924   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1925     this.userDirectoryService = userDirectoryService;
1926   }
1927 
1928   /**
1929    * Sets a reference to the organization directory service.
1930    *
1931    * @param organizationDirectory
1932    *          the organization directory
1933    */
1934   @Reference
1935   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
1936     this.organizationDirectoryService = organizationDirectory;
1937   }
1938 
1939   /**
1940    * Sets the series service
1941    *
1942    * @param seriesService
1943    *          the seriesService to set
1944    */
1945   @Reference
1946   public void setSeriesService(SeriesService seriesService) {
1947     this.seriesService = seriesService;
1948   }
1949 
1950   /**
1951    * Sets the asset manager
1952    *
1953    * @param assetManager
1954    *          the assetManager to set
1955    */
1956   @Reference
1957   public void setAssetManager(AssetManager assetManager) {
1958     this.assetManager = assetManager;
1959   }
1960 
1961   /**
1962    * Callback to set the metadata service
1963    *
1964    * @param service
1965    *          the metadata service
1966    */
1967   @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC, unbind = "removeMetadataService")
1968   protected void addMetadataService(MediaPackageMetadataService service) {
1969     metadataServices.add(service);
1970   }
1971 
1972   /**
1973    * Callback to remove a mediapackage metadata service.
1974    *
1975    * @param service
1976    *          the mediapackage metadata service to remove
1977    */
1978   protected void removeMetadataService(MediaPackageMetadataService service) {
1979     metadataServices.remove(service);
1980   }
1981 
1982   /**
1983    * Callback to set the workflow definition scanner
1984    *
1985    * @param scanner
1986    *          the workflow definition scanner
1987    */
1988   @Reference
1989   protected void addWorkflowDefinitionScanner(WorkflowDefinitionScanner scanner) {
1990     workflowDefinitionScanner = scanner;
1991   }
1992 
1993   /**
1994    * Callback to set the Admin UI index.
1995    *
1996    * @param index
1997    *          the admin UI index.
1998    */
1999   @Reference
2000   public void setIndex(ElasticsearchIndex index) {
2001     this.index = index;
2002   }
2003 
2004   /**
2005    * Callback to set the workflow database
2006    *
2007    * @param persistence
2008    *          the workflow database
2009    */
2010   @Reference(name = "workflow-persistence")
2011   public void setPersistence(WorkflowServiceDatabase persistence) {
2012     this.persistence = persistence;
2013   }
2014 
2015   /**
2016    * {@inheritDoc}
2017    *
2018    * @see org.opencastproject.job.api.JobProducer#getJobType()
2019    */
2020   @Override
2021   public String getJobType() {
2022     return JOB_TYPE;
2023   }
2024 
2025   /**
2026    * A tuple of a workflow operation handler and the name of the operation it handles
2027    */
2028   public static class HandlerRegistration {
2029 
2030     protected WorkflowOperationHandler handler;
2031     protected String operationName;
2032 
2033     public HandlerRegistration(String operationName, WorkflowOperationHandler handler) {
2034       if (operationName == null)
2035         throw new IllegalArgumentException("Operation name cannot be null");
2036       if (handler == null)
2037         throw new IllegalArgumentException("Handler cannot be null");
2038       this.operationName = operationName;
2039       this.handler = handler;
2040     }
2041 
2042     public WorkflowOperationHandler getHandler() {
2043       return handler;
2044     }
2045 
2046     /**
2047      * {@inheritDoc}
2048      *
2049      * @see java.lang.Object#hashCode()
2050      */
2051     @Override
2052     public int hashCode() {
2053       final int prime = 31;
2054       int result = 1;
2055       result = prime * result + handler.hashCode();
2056       result = prime * result + operationName.hashCode();
2057       return result;
2058     }
2059 
2060     /**
2061      * {@inheritDoc}
2062      *
2063      * @see java.lang.Object#equals(java.lang.Object)
2064      */
2065     @Override
2066     public boolean equals(Object obj) {
2067       if (this == obj)
2068         return true;
2069       if (obj == null)
2070         return false;
2071       if (getClass() != obj.getClass())
2072         return false;
2073       HandlerRegistration other = (HandlerRegistration) obj;
2074       if (!handler.equals(other.handler))
2075         return false;
2076       return operationName.equals(other.operationName);
2077     }
2078   }
2079 
2080   /**
2081    * A utility class to run jobs
2082    */
2083   class JobRunner implements Callable<Void> {
2084 
2085     /** The job */
2086     private Job job = null;
2087 
2088     /** The current job */
2089     private final Job currentJob;
2090 
2091     /**
2092      * Constructs a new job runner
2093      *
2094      * @param job
2095      *          the job to run
2096      * @param currentJob
2097      *          the current running job
2098      */
2099     JobRunner(Job job, Job currentJob) {
2100       this.job = job;
2101       this.currentJob = currentJob;
2102     }
2103 
2104     /**
2105      * {@inheritDoc}
2106      *
2107      * @see java.util.concurrent.Callable#call()
2108      */
2109     @Override
2110     public Void call() throws Exception {
2111       Organization jobOrganization = organizationDirectoryService.getOrganization(job.getOrganization());
2112       try {
2113         serviceRegistry.setCurrentJob(currentJob);
2114         securityService.setOrganization(jobOrganization);
2115         User jobUser = userDirectoryService.loadUser(job.getCreator());
2116         securityService.setUser(jobUser);
2117         process(job);
2118       } finally {
2119         serviceRegistry.setCurrentJob(null);
2120         securityService.setUser(null);
2121         securityService.setOrganization(null);
2122       }
2123       return null;
2124     }
2125   }
2126 
2127   @Override
2128   public synchronized void cleanupWorkflowInstances(int buffer, WorkflowState state) throws UnauthorizedException,
2129           WorkflowDatabaseException {
2130     logger.info("Start cleaning up workflow instances older than {} days with status '{}'", buffer, state);
2131 
2132     int instancesCleaned = 0;
2133     int cleaningFailed = 0;
2134 
2135     Date priorTo = DateUtils.addDays(new Date(), -buffer);
2136 
2137     try {
2138       for (WorkflowInstance workflowInstance : persistence.getWorkflowInstancesForCleanup(state, priorTo)) {
2139         try {
2140           logger.debug("Deleting workflow instance {}", workflowInstance.getId());
2141           remove(workflowInstance.getId());
2142           instancesCleaned++;
2143         } catch (WorkflowDatabaseException | UnauthorizedException e) {
2144           throw e;
2145         } catch (NotFoundException e) {
2146           // Since we are in a cleanup operation, we don't have to care about NotFoundExceptions
2147           logger.debug("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2148         } catch (WorkflowParsingException | WorkflowStateException e) {
2149           logger.warn("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2150           cleaningFailed++;
2151         }
2152       }
2153     } catch (WorkflowDatabaseException e) {
2154       throw new WorkflowDatabaseException(e);
2155     }
2156 
2157     if (instancesCleaned == 0 && cleaningFailed == 0) {
2158       logger.info("No workflow instances found to clean up");
2159       return;
2160     }
2161 
2162     if (instancesCleaned > 0)
2163       logger.info("Cleaned up '{}' workflow instances", instancesCleaned);
2164     if (cleaningFailed > 0) {
2165       logger.warn("Cleaning failed for '{}' workflow instances", cleaningFailed);
2166       throw new WorkflowDatabaseException("Unable to clean all workflow instances, see logs!");
2167     }
2168   }
2169 
2170   @Override
2171   public Map<String, Map<String, String>> getWorkflowStateMappings() {
2172     return workflowDefinitionScanner.workflowStateMappings.entrySet().stream().collect(Collectors.toMap(
2173             Entry::getKey, e -> e.getValue().stream()
2174                     .collect(Collectors.toMap(m -> m.getState().name(), WorkflowStateMapping::getValue))
2175     ));
2176   }
2177 
2178   @Override
2179   public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
2180     try {
2181       final int total;
2182       try {
2183         total = persistence.countMediaPackages();
2184       } catch (WorkflowDatabaseException e) {
2185         logIndexRebuildError(logger, e);
2186         throw new IndexRebuildException(getService(), e);
2187       }
2188 
2189       if (total > 0) {
2190         logIndexRebuildBegin(logger, total, "workflows");
2191         int current = 0;
2192         int n = 20;
2193         List<WorkflowIndexData> workflowIndexData;
2194 
2195         int limit = 1000;
2196         int offset = 0;
2197         String currentMediapackageId;
2198         String lastMediapackageId = "";
2199         var updatedWorkflowRange = new ArrayList<Event>();
2200         do {
2201           try {
2202             workflowIndexData = persistence.getWorkflowIndexData(limit, offset);
2203           } catch (WorkflowDatabaseException e) {
2204             logIndexRebuildError(logger, e);
2205             throw new IndexRebuildException(getService(), e);
2206           }
2207           if (!workflowIndexData.isEmpty()) {
2208             offset += limit;
2209             logger.debug("Got {} workflows for re-indexing", workflowIndexData.size());
2210 
2211             for (WorkflowIndexData indexData : workflowIndexData) {
2212               currentMediapackageId = indexData.getMediaPackageId();
2213               if (currentMediapackageId.equals(lastMediapackageId)) {
2214                 continue;
2215               }
2216               current++;
2217 
2218               // Include PAUSED; otherwise, paused workflows will show up as "Finished"
2219               if (!WorkflowUtil.isActive(WorkflowInstance.WorkflowState.values()[indexData.getState()].toString())
2220                       || WorkflowState.PAUSED == WorkflowInstance.WorkflowState.values()[indexData.getState()]) {
2221                 String orgid = indexData.getOrganizationId();
2222                 if (null == orgid) {
2223                   String mpId = indexData.getMediaPackageId();
2224                   //We're assuming here that mediapackages don't change orgs
2225                   List<Snapshot> snapshots = assetManager.getSnapshotsById(mpId);
2226                   if (snapshots.size() == 0) {
2227                     logger.debug("Dropping {} from the index since it is missing from the database", mpId);
2228                     continue;
2229                   }
2230                   orgid = snapshots.stream().findFirst().get().getOrganizationId();
2231                   //We try-catch here since it's possible for the WF to exist in the *index* but not in the *DB*
2232                   // It probably shouldn't be, but that won't keep it from happening anyway.
2233                   try {
2234                     //NB: This version of getWorkflow takes the org id, which in this case is null
2235                     // Using the normal version filters by org, and since this workflow has a NULL org it can't be found
2236                     WorkflowInstance instance = persistence.getWorkflow(indexData.getId(), null);
2237                     instance.setOrganizationId(orgid);
2238                     persistence.updateInDatabase(instance);
2239                   } catch (NotFoundException e) {
2240                     //Technically this should never happen, but getWorkflow throws it.
2241                   }
2242                 }
2243                 var updatedWorkflowData = index.getEvent(indexData.getMediaPackageId(), orgid, securityService.getUser());
2244                 updatedWorkflowData = getStateUpdateFunction(indexData.getId(),indexData.getState(),
2245                         indexData.getMediaPackageId(), indexData.getTemplate(), indexData.getOrganizationId())
2246                         .apply(updatedWorkflowData);
2247                 updatedWorkflowRange.add(updatedWorkflowData.get());
2248 
2249                 if (updatedWorkflowRange.size() >= n || current >= total) {
2250                   index.bulkEventUpdate(updatedWorkflowRange);
2251                   logIndexRebuildProgress(logger, total, current, n);
2252                   updatedWorkflowRange.clear();
2253                 }
2254               }
2255               else {
2256                 logger.info("Skipping. Workflow {} is currently active.", indexData.getId());
2257               }
2258 
2259               lastMediapackageId = currentMediapackageId;
2260             }
2261           }
2262         } while (!workflowIndexData.isEmpty());
2263       }
2264     } catch (Exception e) {
2265       logIndexRebuildError(logger, e);
2266       throw new IndexRebuildException(getService(), e);
2267     }
2268   }
2269 
2270   @Override
2271   public IndexRebuildService.Service getService() {
2272     return IndexRebuildService.Service.Workflow;
2273   }
2274 
2275   /**
2276    * Remove a workflow instance from the Elasticsearch index.
2277    *
2278    * @param workflowInstanceId
2279    *         the identifier of the workflow instance to remove
2280    */
2281   private void removeWorkflowInstanceFromIndex(long workflowInstanceId) {
2282     final String orgId = securityService.getOrganization().getId();
2283     final User user = securityService.getUser();
2284 
2285     // find events
2286     SearchResult<Event> results;
2287     try {
2288       results = index.getByQuery(new EventSearchQuery(orgId, user).withWorkflowId(workflowInstanceId));
2289     } catch (SearchIndexException e) {
2290       logger.error("Error retrieving the events for workflow instance {} from the {} index.", workflowInstanceId,
2291               index.getIndexName(), e);
2292       return;
2293     }
2294 
2295     if (results.getItems().length == 0) {
2296       logger.warn("No events for workflow instance {} found in the {} index.", workflowInstanceId,
2297               index.getIndexName());
2298       return;
2299     }
2300 
2301     // should be only one event, but better safe than sorry
2302     for (SearchResultItem<Event> item: results.getItems()) {
2303       String eventId = item.getSource().getIdentifier();
2304       logger.debug("Removing workflow instance {} of event {} from the {} index.", workflowInstanceId, eventId,
2305               index.getIndexName());
2306 
2307       Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
2308         if (!eventOpt.isPresent()) {
2309           logger.warn("Event {} of workflow instance {} not found in the {} index.", workflowInstanceId,
2310                   eventId, index.getIndexName());
2311           return Optional.empty();
2312         }
2313         Event event = eventOpt.get();
2314         if (event.getWorkflowId() != null && event.getWorkflowId().equals(workflowInstanceId)) {
2315           logger.debug("Workflow {} is the current workflow of event {}. Removing it from event.", eventId,
2316                   workflowInstanceId);
2317           event.setWorkflowId(null);
2318           event.setWorkflowDefinitionId(null);
2319           event.setWorkflowState(null);
2320           return Optional.of(event);
2321         }
2322         return Optional.empty();
2323       };
2324 
2325       try {
2326         index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
2327         logger.debug("Workflow instance {} of event {} removed from the {} index.", workflowInstanceId, eventId,
2328                 index.getIndexName());
2329       } catch (SearchIndexException e) {
2330         logger.error("Error removing the workflow instance {} of event {} from the {} index.", workflowInstanceId,
2331                 eventId, index.getIndexName(), e);
2332       }
2333     }
2334   }
2335 
2336   /**
2337    * Update a workflow instance in the Elasticsearch index.
2338    *
2339    * @param id
2340    *         workflow id
2341    * @param state
2342    *         workflow state as int
2343    * @param mpId
2344    *         corresponding mediapackage id
2345    * @param orgId
2346    *         workflow organization id
2347    */
2348   private void updateWorkflowInstanceInIndex(long id, int state, String wfDefId, String mpId, String orgId) {
2349     final User user = securityService.getUser();
2350 
2351     logger.debug("Updating workflow instance {} of event {} in the {} index.", id, mpId,
2352             index.getIndexName());
2353     Function<Optional<Event>, Optional<Event>> updateFunction = getStateUpdateFunction(id, state, wfDefId, mpId, orgId);
2354 
2355     try {
2356       index.addOrUpdateEvent(mpId, updateFunction, orgId, user);
2357       logger.debug("Workflow instance {} of event {} updated in the {} index.", id, mpId,
2358               index.getIndexName());
2359     } catch (SearchIndexException e) {
2360       logger.error("Error updating the workflow instance {} of event {} in the {} index.", id, mpId,
2361               index.getIndexName(), e);
2362     }
2363   }
2364 
2365   /**
2366    * Get the function to update the workflow state for an event in the Elasticsearch index.
2367    *
2368    * @return the function to do the update
2369    */
2370   private Function<Optional<Event>, Optional<Event>> getStateUpdateFunction(long workflowId,
2371           int workflowState, String workflowDefinitionId, String mediaPackageId,
2372           String orgId) {
2373     return (Optional<Event> eventOpt) -> {
2374       Event event = eventOpt.orElse(new Event(mediaPackageId, orgId));
2375       event.setWorkflowId(workflowId);
2376       event.setWorkflowState(WorkflowState.values()[workflowState]);
2377       event.setWorkflowDefinitionId(workflowDefinitionId);
2378       return Optional.of(event);
2379     };
2380   }
2381 }