1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.impl;
23
24 import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
25 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILED;
26 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILING;
27 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.INSTANTIATED;
28 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.PAUSED;
29 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.RUNNING;
30 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.STOPPED;
31 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.SUCCEEDED;
32
33 import org.opencastproject.assetmanager.api.AssetManager;
34 import org.opencastproject.assetmanager.api.Snapshot;
35 import org.opencastproject.assetmanager.util.WorkflowPropertiesUtil;
36 import org.opencastproject.elasticsearch.api.SearchIndexException;
37 import org.opencastproject.elasticsearch.api.SearchResult;
38 import org.opencastproject.elasticsearch.api.SearchResultItem;
39 import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
40 import org.opencastproject.elasticsearch.index.objects.event.Event;
41 import org.opencastproject.elasticsearch.index.objects.event.EventSearchQuery;
42 import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
43 import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
44 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
45 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
46 import org.opencastproject.job.api.Job;
47 import org.opencastproject.job.api.Job.Status;
48 import org.opencastproject.job.api.JobProducer;
49 import org.opencastproject.mediapackage.MediaPackage;
50 import org.opencastproject.mediapackage.MediaPackageElement;
51 import org.opencastproject.mediapackage.MediaPackageParser;
52 import org.opencastproject.mediapackage.MediaPackageSupport;
53 import org.opencastproject.mediapackage.Publication;
54 import org.opencastproject.metadata.api.MediaPackageMetadata;
55 import org.opencastproject.metadata.api.MediaPackageMetadataService;
56 import org.opencastproject.metadata.api.MetadataService;
57 import org.opencastproject.metadata.api.util.MediaPackageMetadataSupport;
58 import org.opencastproject.security.api.AccessControlList;
59 import org.opencastproject.security.api.AccessControlUtil;
60 import org.opencastproject.security.api.AclScope;
61 import org.opencastproject.security.api.AuthorizationService;
62 import org.opencastproject.security.api.Organization;
63 import org.opencastproject.security.api.OrganizationDirectoryService;
64 import org.opencastproject.security.api.Permissions;
65 import org.opencastproject.security.api.SecurityService;
66 import org.opencastproject.security.api.UnauthorizedException;
67 import org.opencastproject.security.api.User;
68 import org.opencastproject.security.api.UserDirectoryService;
69 import org.opencastproject.series.api.SeriesException;
70 import org.opencastproject.series.api.SeriesService;
71 import org.opencastproject.serviceregistry.api.ServiceRegistry;
72 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
73 import org.opencastproject.serviceregistry.api.UndispatchableJobException;
74 import org.opencastproject.util.NotFoundException;
75 import org.opencastproject.util.ReadinessIndicator;
76 import org.opencastproject.util.data.Tuple;
77 import org.opencastproject.workflow.api.ResumableWorkflowOperationHandler;
78 import org.opencastproject.workflow.api.RetryStrategy;
79 import org.opencastproject.workflow.api.WorkflowDatabaseException;
80 import org.opencastproject.workflow.api.WorkflowDefinition;
81 import org.opencastproject.workflow.api.WorkflowException;
82 import org.opencastproject.workflow.api.WorkflowIdentifier;
83 import org.opencastproject.workflow.api.WorkflowIndexData;
84 import org.opencastproject.workflow.api.WorkflowInstance;
85 import org.opencastproject.workflow.api.WorkflowInstance.WorkflowState;
86 import org.opencastproject.workflow.api.WorkflowListener;
87 import org.opencastproject.workflow.api.WorkflowOperationDefinition;
88 import org.opencastproject.workflow.api.WorkflowOperationDefinitionImpl;
89 import org.opencastproject.workflow.api.WorkflowOperationHandler;
90 import org.opencastproject.workflow.api.WorkflowOperationInstance;
91 import org.opencastproject.workflow.api.WorkflowOperationInstance.OperationState;
92 import org.opencastproject.workflow.api.WorkflowOperationResult;
93 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
94 import org.opencastproject.workflow.api.WorkflowOperationResultImpl;
95 import org.opencastproject.workflow.api.WorkflowParsingException;
96 import org.opencastproject.workflow.api.WorkflowService;
97 import org.opencastproject.workflow.api.WorkflowServiceDatabase;
98 import org.opencastproject.workflow.api.WorkflowStateException;
99 import org.opencastproject.workflow.api.WorkflowStateMapping;
100 import org.opencastproject.workflow.api.WorkflowUtil;
101 import org.opencastproject.workflow.api.XmlWorkflowParser;
102 import org.opencastproject.workspace.api.Workspace;
103
104 import com.google.common.util.concurrent.Striped;
105
106 import org.apache.commons.io.IOUtils;
107 import org.apache.commons.lang3.StringUtils;
108 import org.apache.commons.lang3.time.DateUtils;
109 import org.osgi.framework.InvalidSyntaxException;
110 import org.osgi.framework.ServiceReference;
111 import org.osgi.service.component.ComponentContext;
112 import org.osgi.service.component.annotations.Activate;
113 import org.osgi.service.component.annotations.Component;
114 import org.osgi.service.component.annotations.Reference;
115 import org.osgi.service.component.annotations.ReferenceCardinality;
116 import org.osgi.service.component.annotations.ReferencePolicy;
117 import org.slf4j.Logger;
118 import org.slf4j.LoggerFactory;
119
120 import java.io.ByteArrayOutputStream;
121 import java.io.IOException;
122 import java.nio.charset.StandardCharsets;
123 import java.util.ArrayList;
124 import java.util.Collections;
125 import java.util.Comparator;
126 import java.util.Date;
127 import java.util.HashMap;
128 import java.util.HashSet;
129 import java.util.List;
130 import java.util.Map;
131 import java.util.Map.Entry;
132 import java.util.Optional;
133 import java.util.Properties;
134 import java.util.Set;
135 import java.util.SortedSet;
136 import java.util.TreeSet;
137 import java.util.concurrent.Callable;
138 import java.util.concurrent.CopyOnWriteArrayList;
139 import java.util.concurrent.Executors;
140 import java.util.concurrent.ThreadPoolExecutor;
141 import java.util.concurrent.locks.Lock;
142 import java.util.function.Function;
143 import java.util.stream.Collectors;
144
145
146
147
148
149
150
151
152 @Component(
153 property = {
154 "service.description=Workflow Service",
155 "service.pid=org.opencastproject.workflow.impl.WorkflowServiceImpl"
156 },
157 immediate = true,
158 service = { WorkflowService.class, WorkflowServiceImpl.class, IndexProducer.class }
159 )
160 public class WorkflowServiceImpl extends AbstractIndexProducer implements WorkflowService, JobProducer {
161
162
163 private static final String RETRY_STRATEGY = "retryStrategy";
164
165
166 private static final Logger logger = LoggerFactory.getLogger(WorkflowServiceImpl.class);
167
168
169 enum Operation {
170 START_WORKFLOW, RESUME, START_OPERATION
171 }
172
173
174 private static final String NULL_PARENT_ID = "-";
175
176
177
178
179
180 private static final float WORKFLOW_JOB_LOAD = 0.0f;
181
182
183 public static final String ERROR_RESOLUTION_HANDLER_ID = "error-resolution";
184
185
186 protected ComponentContext componentContext = null;
187
188
189 private SortedSet<MediaPackageMetadataService> metadataServices;
190
191
192 protected WorkflowServiceDatabase persistence;
193
194
195 private final List<WorkflowListener> listeners = new CopyOnWriteArrayList<WorkflowListener>();
196
197
198 protected ThreadPoolExecutor executorService;
199
200
201 protected Workspace workspace = null;
202
203
204 protected ServiceRegistry serviceRegistry = null;
205
206
207 protected SecurityService securityService = null;
208
209
210 protected AuthorizationService authorizationService = null;
211
212
213 protected UserDirectoryService userDirectoryService = null;
214
215
216 protected OrganizationDirectoryService organizationDirectoryService = null;
217
218
219 protected SeriesService seriesService;
220
221
222 protected AssetManager assetManager = null;
223
224
225 private WorkflowDefinitionScanner workflowDefinitionScanner;
226
227
228 private final List<Long> delayedWorkflows = new ArrayList<Long>();
229
230
231 private final Striped<Lock> lock = Striped.lazyWeakLock(1024);
232 private final Striped<Lock> updateLock = Striped.lazyWeakLock(1024);
233 private final Striped<Lock> mediaPackageLocks = Striped.lazyWeakLock(1024);
234
235
236 private ElasticsearchIndex index;
237
238
239
240
241 public WorkflowServiceImpl() {
242 metadataServices = new TreeSet<>(Comparator.comparingInt(MetadataService::getPriority));
243 }
244
245
246
247
248
249
250
251 @Activate
252 public void activate(ComponentContext componentContext) {
253 this.componentContext = componentContext;
254 executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();
255 logger.info("Activate Workflow service");
256 }
257
258
259
260
261
262
263 @Override
264 public void addWorkflowListener(WorkflowListener listener) {
265 listeners.add(listener);
266 }
267
268
269
270
271
272
273 @Override
274 public void removeWorkflowListener(WorkflowListener listener) {
275 listeners.remove(listener);
276 }
277
278
279
280
281 protected void fireListeners(final WorkflowInstance oldWorkflowInstance, final WorkflowInstance newWorkflowInstance) {
282 final User currentUser = securityService.getUser();
283 final Organization currentOrganization = securityService.getOrganization();
284 for (final WorkflowListener listener : listeners) {
285 if (oldWorkflowInstance == null || !oldWorkflowInstance.getState().equals(newWorkflowInstance.getState())) {
286 Runnable runnable = () -> {
287 try {
288 securityService.setUser(currentUser);
289 securityService.setOrganization(currentOrganization);
290 listener.stateChanged(newWorkflowInstance);
291 } finally {
292 securityService.setUser(null);
293 securityService.setOrganization(null);
294 }
295 };
296 executorService.execute(runnable);
297 } else {
298 logger.debug("Not notifying {} because the workflow state has not changed", listener);
299 }
300
301 if (newWorkflowInstance.getCurrentOperation() != null) {
302 if (oldWorkflowInstance == null || oldWorkflowInstance.getCurrentOperation() == null
303 || !oldWorkflowInstance.getCurrentOperation().equals(newWorkflowInstance.getCurrentOperation())) {
304 Runnable runnable = () -> {
305 try {
306 securityService.setUser(currentUser);
307 securityService.setOrganization(currentOrganization);
308 listener.operationChanged(newWorkflowInstance);
309 } finally {
310 securityService.setUser(null);
311 securityService.setOrganization(null);
312 }
313 };
314 executorService.execute(runnable);
315 }
316 } else {
317 logger.debug("Not notifying {} because the workflow operation has not changed", listener);
318 }
319 }
320 }
321
322
323
324
325
326
327 @Override
328 public List<WorkflowDefinition> listAvailableWorkflowDefinitions() {
329 return workflowDefinitionScanner
330 .getAvailableWorkflowDefinitions(securityService.getOrganization(), securityService.getUser())
331 .sorted()
332 .collect(Collectors.toList());
333 }
334
335
336
337
338 public boolean isRunnable(WorkflowDefinition workflowDefinition) {
339 List<String> availableOperations = listAvailableOperationNames();
340 List<WorkflowDefinition> checkedWorkflows = new ArrayList<>();
341 boolean runnable = isRunnable(workflowDefinition, availableOperations, checkedWorkflows);
342 int wfCount = checkedWorkflows.size() - 1;
343 if (runnable)
344 logger.info("Workflow {}, containing {} derived workflows, is runnable", workflowDefinition, wfCount);
345 else
346 logger.warn("Workflow {}, containing {} derived workflows, is not runnable", workflowDefinition, wfCount);
347 return runnable;
348 }
349
350
351
352
353
354
355
356
357
358
359
360
361
362 private boolean isRunnable(WorkflowDefinition workflowDefinition, List<String> availableOperations,
363 List<WorkflowDefinition> checkedWorkflows) {
364 if (checkedWorkflows.contains(workflowDefinition))
365 return true;
366
367
368 for (WorkflowOperationDefinition op : workflowDefinition.getOperations()) {
369 if (!availableOperations.contains(op.getId())) {
370 logger.info("{} is not runnable due to missing operation {}", workflowDefinition, op);
371 return false;
372 }
373 String catchWorkflow = op.getExceptionHandlingWorkflow();
374 if (catchWorkflow != null) {
375 WorkflowDefinition catchWorkflowDefinition;
376 try {
377 catchWorkflowDefinition = getWorkflowDefinitionById(catchWorkflow);
378 } catch (NotFoundException e) {
379 logger.info("{} is not runnable due to missing catch workflow {} on operation {}", workflowDefinition,
380 catchWorkflow, op);
381 return false;
382 }
383 if (!isRunnable(catchWorkflowDefinition, availableOperations, checkedWorkflows))
384 return false;
385 }
386 }
387
388
389 if (!checkedWorkflows.contains(workflowDefinition))
390 checkedWorkflows.add(workflowDefinition);
391 return true;
392 }
393
394
395
396
397
398
399 public Set<HandlerRegistration> getRegisteredHandlers() {
400 Set<HandlerRegistration> set = new HashSet<>();
401 ServiceReference[] refs;
402 try {
403 refs = componentContext.getBundleContext().getServiceReferences(WorkflowOperationHandler.class.getName(), null);
404 } catch (InvalidSyntaxException e) {
405 throw new IllegalStateException(e);
406 }
407 if (refs != null) {
408 for (ServiceReference ref : refs) {
409 WorkflowOperationHandler handler = (WorkflowOperationHandler) componentContext.getBundleContext().getService(
410 ref);
411 set.add(new HandlerRegistration((String) ref.getProperty(WORKFLOW_OPERATION_PROPERTY), handler));
412 }
413 } else {
414 logger.warn("No registered workflow operation handlers found");
415 }
416 return set;
417 }
418
419 protected WorkflowOperationHandler getWorkflowOperationHandler(String operationId) {
420 for (HandlerRegistration reg : getRegisteredHandlers()) {
421 if (reg.operationName.equals(operationId))
422 return reg.handler;
423 }
424 return null;
425 }
426
427
428
429
430
431
432
433 protected List<String> listAvailableOperationNames() {
434 return getRegisteredHandlers().parallelStream().map(op -> op.operationName).collect(Collectors.toList());
435 }
436
437
438
439
440
441
442 @Override
443 public WorkflowInstance getWorkflowById(long id) throws NotFoundException,
444 UnauthorizedException {
445 try {
446 WorkflowInstance workflow = persistence.getWorkflow(id);
447 assertPermission(workflow, Permissions.Action.READ.toString(), workflow.getOrganizationId());
448 return workflow;
449 } catch (WorkflowDatabaseException e) {
450 throw new IllegalStateException("Got not get workflow from database with id ");
451 }
452 }
453
454
455
456
457
458
459
460 @Override
461 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage)
462 throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
463 return start(workflowDefinition, mediaPackage, new HashMap<>());
464 }
465
466
467
468
469
470
471
472 @Override
473 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage,
474 Map<String, String> properties)
475 throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
476 try {
477 return start(workflowDefinition, mediaPackage, null, properties);
478 } catch (NotFoundException e) {
479
480 throw new IllegalStateException("a null workflow ID caused a NotFoundException. This is a programming error.");
481 }
482 }
483
484
485
486
487
488
489
490 @Override
491 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage sourceMediaPackage,
492 Long parentWorkflowId, Map<String, String> originalProperties) throws WorkflowDatabaseException,
493 NotFoundException, UnauthorizedException, WorkflowParsingException, IllegalStateException {
494 final String mediaPackageId = sourceMediaPackage.getIdentifier().toString();
495 Map<String, String> properties = null;
496
497
498
499 populateMediaPackageMetadata(sourceMediaPackage);
500
501 if (originalProperties != null) {
502 WorkflowPropertiesUtil.storeProperties(assetManager, sourceMediaPackage, originalProperties);
503 properties = WorkflowPropertiesUtil.getLatestWorkflowProperties(assetManager, mediaPackageId);
504 }
505
506
507 final Lock lock = mediaPackageLocks.get(mediaPackageId);
508 lock.lock();
509
510 try {
511 if (workflowDefinition == null)
512 throw new IllegalArgumentException("workflow definition must not be null");
513 Optional<List<String>> errors = MediaPackageSupport.sanityCheck(sourceMediaPackage);
514 if (errors.isPresent()) {
515 throw new IllegalArgumentException("Insane media package cannot be processed: " + String.join("; ", errors.get()));
516 }
517 if (parentWorkflowId != null) {
518 try {
519 getWorkflowById(parentWorkflowId);
520 } catch (UnauthorizedException e) {
521 throw new IllegalArgumentException("Parent workflow " + parentWorkflowId + " not visible to this user");
522 }
523 } else {
524 if (persistence.mediaPackageHasActiveWorkflows(mediaPackageId)) {
525 throw new IllegalStateException(String.format(
526 "Can't start workflow '%s' for media package '%s' because another workflow is currently active.",
527 workflowDefinition.getTitle(),
528 sourceMediaPackage.getIdentifier().toString()));
529 }
530 }
531
532
533 User currentUser = securityService.getUser();
534 validUserOrThrow(currentUser);
535
536
537 Organization organization = securityService.getOrganization();
538 if (organization == null)
539 throw new SecurityException("Current organization is unknown");
540
541 WorkflowInstance workflowInstance = new WorkflowInstance(workflowDefinition, sourceMediaPackage,
542 currentUser, organization, properties);
543 workflowInstance = updateConfiguration(workflowInstance, properties);
544
545
546 try {
547
548 String workflowDefinitionXml = XmlWorkflowParser.toXml(workflowDefinition);
549 String mediaPackageXml = MediaPackageParser.getAsXml(sourceMediaPackage);
550
551 List<String> arguments = new ArrayList<>();
552 arguments.add(workflowDefinitionXml);
553 arguments.add(mediaPackageXml);
554 if (parentWorkflowId != null || properties != null) {
555 String parentWorkflowIdString = (parentWorkflowId != null) ? parentWorkflowId.toString() : NULL_PARENT_ID;
556 arguments.add(parentWorkflowIdString);
557 }
558 if (properties != null) {
559 arguments.add(mapToString(properties));
560 }
561
562 Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_WORKFLOW.toString(), arguments,
563 null, false, null, WORKFLOW_JOB_LOAD);
564
565
566 workflowInstance.setId(job.getId());
567
568
569
570 update(workflowInstance);
571
572 return workflowInstance;
573 } catch (Throwable t) {
574 try {
575 workflowInstance.setState(FAILED);
576 update(workflowInstance);
577 } catch (Exception failureToFail) {
578 logger.warn("Unable to update workflow to failed state", failureToFail);
579 }
580 try {
581 throw t;
582 } catch (ServiceRegistryException e) {
583 throw new WorkflowDatabaseException(e);
584 }
585 }
586 } finally {
587 lock.unlock();
588 }
589 }
590
591 protected WorkflowInstance updateConfiguration(WorkflowInstance instance, Map<String, String> properties) {
592 if (properties != null) {
593 for (Entry<String, String> entry : properties.entrySet()) {
594 instance.setConfiguration(entry.getKey(), entry.getValue());
595 }
596 }
597 return instance;
598 }
599
600
601
602
603
604
605
606
607 protected WorkflowOperationHandler selectOperationHandler(WorkflowOperationInstance operation) {
608 List<WorkflowOperationHandler> handlerList = new ArrayList<>();
609 for (HandlerRegistration handlerReg : getRegisteredHandlers()) {
610 if (handlerReg.operationName != null && handlerReg.operationName.equals(operation.getTemplate())) {
611 handlerList.add(handlerReg.handler);
612 }
613 }
614 if (handlerList.size() > 1) {
615 throw new IllegalStateException("Multiple operation handlers found for operation '" + operation.getTemplate()
616 + "'");
617 } else if (handlerList.size() == 1) {
618 return handlerList.get(0);
619 }
620 logger.warn("No workflow operation handlers found for operation '{}'", operation.getTemplate());
621 return null;
622 }
623
624
625
626
627
628
629
630
631
632
633 protected Job runWorkflow(WorkflowInstance workflow) throws WorkflowException, UnauthorizedException {
634 if (INSTANTIATED != workflow.getState()) {
635
636
637
638
639 if (RUNNING == workflow.getState()) {
640 WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
641 if (currentOperation != null) {
642 if (currentOperation.getId() != null) {
643 try {
644 Job operationJob = serviceRegistry.getJob(currentOperation.getId());
645 if (Job.Status.RUNNING.equals(operationJob.getStatus())) {
646 logger.debug("Not starting workflow {}, it is already in running state", workflow);
647 return null;
648 } else {
649 logger.info("Scheduling next operation of workflow {}", workflow);
650 operationJob.setStatus(Status.QUEUED);
651 operationJob.setDispatchable(true);
652 return serviceRegistry.updateJob(operationJob);
653 }
654 } catch (Exception e) {
655 logger.warn("Error determining status of current workflow operation in {}", workflow, e);
656 return null;
657 }
658 }
659 } else {
660 throw new IllegalStateException("Cannot start a workflow '" + workflow + "' with no current operation");
661 }
662 } else {
663 throw new IllegalStateException("Cannot start a workflow in state '" + workflow.getState() + "'");
664 }
665 }
666
667
668 workflow.setState(RUNNING);
669 update(workflow);
670
671 WorkflowOperationInstance operation = workflow.getCurrentOperation();
672
673 if (operation == null) {
674 throw new IllegalStateException("Cannot start a workflow without a current operation");
675 }
676
677 if (!operation.equals(workflow.getOperations().get(0))) {
678 throw new IllegalStateException("Current operation expected to be first");
679 }
680
681 try {
682 logger.info("Scheduling workflow {} for execution", workflow.getId());
683 Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
684 Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
685 operation.setId(job.getId());
686 update(workflow);
687 job.setStatus(Status.QUEUED);
688 job.setDispatchable(true);
689 return serviceRegistry.updateJob(job);
690 } catch (ServiceRegistryException e) {
691 throw new WorkflowDatabaseException(e);
692 } catch (NotFoundException e) {
693
694 throw new IllegalStateException("Unable to find a job that was just created");
695 }
696
697 }
698
699
700
701
702
703
704
705
706
707
708
709
710
711 protected WorkflowOperationInstance runWorkflowOperation(WorkflowInstance workflow, Map<String, String> properties)
712 throws WorkflowException, UnauthorizedException {
713 WorkflowOperationInstance processingOperation = workflow.getCurrentOperation();
714 if (processingOperation == null)
715 throw new IllegalStateException("Workflow '" + workflow + "' has no operation to run");
716
717
718 WorkflowState initialState = workflow.getState();
719
720
721 WorkflowOperationHandler operationHandler = selectOperationHandler(processingOperation);
722 WorkflowOperationWorker worker = new WorkflowOperationWorker(operationHandler, workflow, properties, this);
723 workflow = worker.execute();
724
725 Long currentOperationJobId = processingOperation.getId();
726 try {
727 updateOperationJob(currentOperationJobId, processingOperation.getState());
728 } catch (NotFoundException e) {
729 throw new IllegalStateException("Unable to find a job that has already been running");
730 } catch (ServiceRegistryException e) {
731 throw new WorkflowDatabaseException(e);
732 }
733
734
735 WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
736
737
738 if (currentOperation == null) {
739
740
741 if (FAILING.equals(workflow.getState())) {
742 workflow.setState(FAILED);
743 }
744
745
746
747 else if (!FAILED.equals(workflow.getState())) {
748 workflow.setState(SUCCEEDED);
749 for (WorkflowOperationInstance op : workflow.getOperations()) {
750 if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
751 if (op.isFailOnError()) {
752 workflow.setState(FAILED);
753 break;
754 }
755 }
756 }
757 }
758
759
760 logger.debug("{} has {}", workflow, workflow.getState());
761 update(workflow);
762
763 } else {
764
765
766 WorkflowState dbWorkflowState;
767 try {
768 dbWorkflowState = getWorkflowById(workflow.getId()).getState();
769 } catch (NotFoundException e) {
770 throw new IllegalStateException("The workflow with ID " + workflow.getId()
771 + " can not be found in the database", e);
772 } catch (UnauthorizedException e) {
773 throw new IllegalStateException("The workflow with ID " + workflow.getId() + " can not be read", e);
774 }
775
776
777 if (!dbWorkflowState.equals(initialState)) {
778 logger.info("Workflow state for {} was changed to '{}' from the outside", workflow, dbWorkflowState);
779 workflow.setState(dbWorkflowState);
780 }
781
782
783
784 Job job;
785 switch (workflow.getState()) {
786 case FAILED:
787 update(workflow);
788 break;
789 case FAILING:
790 case RUNNING:
791 try {
792 job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
793 Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
794 currentOperation.setId(job.getId());
795 update(workflow);
796 job.setStatus(Status.QUEUED);
797 job.setDispatchable(true);
798 serviceRegistry.updateJob(job);
799 } catch (ServiceRegistryException e) {
800 throw new WorkflowDatabaseException(e);
801 } catch (NotFoundException e) {
802
803 throw new IllegalStateException("Unable to find a job that was just created");
804 }
805 break;
806 case PAUSED:
807 case STOPPED:
808 case SUCCEEDED:
809 update(workflow);
810 break;
811 case INSTANTIATED:
812 update(workflow);
813 throw new IllegalStateException("Impossible workflow state found during processing");
814 default:
815 throw new IllegalStateException("Unknown workflow state found during processing");
816 }
817
818 }
819
820 return processingOperation;
821 }
822
823
824
825
826
827
828 @Override
829 public WorkflowDefinition getWorkflowDefinitionById(String id) throws NotFoundException {
830 final WorkflowIdentifier workflowIdentifier = new WorkflowIdentifier(id, securityService.getOrganization().getId());
831 final WorkflowDefinition def = workflowDefinitionScanner
832 .getWorkflowDefinition(securityService.getUser(), workflowIdentifier);
833 if (def == null) {
834 throw new NotFoundException("Workflow definition '" + workflowIdentifier + "' not found or inaccessible");
835 }
836 return def;
837 }
838
839
840
841
842
843
844 @Override
845 public WorkflowInstance stop(long workflowInstanceId) throws WorkflowException, NotFoundException,
846 UnauthorizedException {
847 final Lock lock = this.lock.get(workflowInstanceId);
848 lock.lock();
849 try {
850 WorkflowInstance instance = getWorkflowById(workflowInstanceId);
851
852 if (instance.getState() != STOPPED) {
853
854 instance.setState(STOPPED);
855 update(instance);
856 }
857
858 try {
859 removeTempFiles(instance);
860 } catch (Exception e) {
861 logger.warn("Cannot remove temp files for workflow instance {}", workflowInstanceId, e);
862 }
863
864 return instance;
865 } finally {
866 lock.unlock();
867 }
868 }
869
870
871
872
873 private void validUserOrThrow(User user) {
874 if (user == null)
875 throw new SecurityException("Current user is unknown");
876
877 if (userDirectoryService.loadUser(user.getUsername()) == null)
878 throw new SecurityException(String.format("Current user '%s' can not be loaded", user.getUsername()));
879 }
880
881 private void removeTempFiles(WorkflowInstance workflowInstance) {
882 logger.info("Removing temporary files for workflow {}", workflowInstance.getId());
883 MediaPackage mp = workflowInstance.getMediaPackage();
884 if (null == mp) {
885 logger.warn("Workflow instance {} does not have an media package set", workflowInstance.getId());
886 return;
887 }
888 for (MediaPackageElement elem : mp.getElements()) {
889
890 if (elem instanceof Publication) {
891 continue;
892 }
893 if (null == elem.getURI()) {
894 logger.warn("Media package element {} from the media package {} does not have an URI set",
895 elem.getIdentifier(), mp.getIdentifier());
896 continue;
897 }
898 try {
899 logger.debug("Removing temporary file {} for workflow {}", elem.getURI(), workflowInstance);
900 workspace.delete(elem.getURI());
901 } catch (IOException e) {
902 logger.warn("Unable to delete mediapackage element", e);
903 } catch (NotFoundException e) {
904
905 }
906 }
907 }
908
909
910
911
912
913
914
915 @Override
916 public void remove(long workflowInstanceId) throws WorkflowDatabaseException, NotFoundException,
917 UnauthorizedException, WorkflowParsingException, WorkflowStateException {
918 remove(workflowInstanceId, false);
919 }
920
921
922
923
924
925
926 @Override
927 public void remove(long workflowInstanceId, boolean force) throws WorkflowDatabaseException, NotFoundException,
928 UnauthorizedException, WorkflowStateException {
929 final Lock lock = this.lock.get(workflowInstanceId);
930 lock.lock();
931 try {
932 WorkflowInstance instance = getWorkflowById(workflowInstanceId);
933 WorkflowInstance.WorkflowState state = instance.getState();
934 if (!state.isTerminated() && !force) {
935 throw new WorkflowStateException("Workflow instance with state '" + state + "' cannot be removed "
936 + "since it is not yet terminated.");
937 }
938
939 assertPermission(instance, Permissions.Action.WRITE.toString(), instance.getOrganizationId());
940
941
942 removeTempFiles(instance);
943
944
945 List<WorkflowOperationInstance> operations = instance.getOperations();
946 List<Long> jobsToDelete = new ArrayList<>();
947 for (WorkflowOperationInstance op : operations) {
948 if (op.getId() != null) {
949 long workflowOpId = op.getId();
950 if (workflowOpId != workflowInstanceId) {
951 jobsToDelete.add(workflowOpId);
952 }
953 }
954 }
955 try {
956 serviceRegistry.removeJobs(jobsToDelete);
957 } catch (ServiceRegistryException e) {
958 logger.warn("Problems while removing jobs related to workflow operations '{}'", jobsToDelete, e);
959 } catch (NotFoundException e) {
960 logger.debug("No jobs related to one of the workflow operation '{}' found in service registry", jobsToDelete);
961 }
962
963
964 try {
965 serviceRegistry.removeJobs(Collections.singletonList(workflowInstanceId));
966 removeWorkflowInstanceFromIndex(instance.getId());
967 } catch (ServiceRegistryException e) {
968 logger.warn("Problems while removing workflow instance job '{}'", workflowInstanceId, e);
969 } catch (NotFoundException e) {
970 logger.info("No workflow instance job '{}' found in the service registry", workflowInstanceId);
971 }
972
973
974 persistence.removeFromDatabase(instance);
975 } finally {
976 lock.unlock();
977 }
978 }
979
980
981
982
983
984
985 @Override
986 public WorkflowInstance suspend(long workflowInstanceId) throws WorkflowException, NotFoundException,
987 UnauthorizedException {
988 final Lock lock = this.lock.get(workflowInstanceId);
989 lock.lock();
990 try {
991 WorkflowInstance instance = getWorkflowById(workflowInstanceId);
992 instance.setState(PAUSED);
993 update(instance);
994 return instance;
995 } finally {
996 lock.unlock();
997 }
998 }
999
1000
1001
1002
1003
1004
1005 @Override
1006 public WorkflowInstance resume(long id) throws WorkflowException, NotFoundException, IllegalStateException,
1007 UnauthorizedException {
1008 return resume(id, null);
1009 }
1010
1011
1012
1013
1014
1015
1016 @Override
1017 public WorkflowInstance resume(long workflowInstanceId, Map<String, String> properties) throws WorkflowException,
1018 NotFoundException, IllegalStateException, UnauthorizedException {
1019 WorkflowInstance workflowInstance = getWorkflowById(workflowInstanceId);
1020 if (!WorkflowState.PAUSED.equals(workflowInstance.getState()))
1021 throw new IllegalStateException("Can not resume a workflow where the current state is not in paused");
1022
1023 workflowInstance = updateConfiguration(workflowInstance, properties);
1024 update(workflowInstance);
1025
1026 WorkflowOperationInstance currentOperation = workflowInstance.getCurrentOperation();
1027
1028
1029 if (currentOperation == null) {
1030
1031
1032 workflowInstance.setState(SUCCEEDED);
1033 for (WorkflowOperationInstance op : workflowInstance.getOperations()) {
1034 if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
1035 if (op.isFailOnError()) {
1036 workflowInstance.setState(FAILED);
1037 break;
1038 }
1039 }
1040 }
1041
1042
1043 logger.debug("{} has {}", workflowInstance, workflowInstance.getState());
1044 update(workflowInstance);
1045 return workflowInstance;
1046 }
1047
1048
1049
1050 if (OperationState.INSTANTIATED.equals(currentOperation.getState())) {
1051 try {
1052
1053 Job operationJob = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
1054 Collections.singletonList(Long.toString(workflowInstanceId)), null, false, null, WORKFLOW_JOB_LOAD);
1055
1056
1057
1058 workflowInstance.setState(RUNNING);
1059 currentOperation.setId(operationJob.getId());
1060
1061
1062 update(workflowInstance);
1063
1064
1065 operationJob.setStatus(Status.QUEUED);
1066 operationJob.setDispatchable(true);
1067 serviceRegistry.updateJob(operationJob);
1068
1069 return workflowInstance;
1070 } catch (ServiceRegistryException e) {
1071 throw new WorkflowDatabaseException(e);
1072 }
1073 }
1074
1075 Long operationJobId = workflowInstance.getCurrentOperation().getId();
1076 if (operationJobId == null)
1077 throw new IllegalStateException("Can not resume a workflow where the current operation has no associated id");
1078
1079
1080 Job workflowJob;
1081 try {
1082 workflowJob = serviceRegistry.getJob(workflowInstanceId);
1083 workflowJob.setStatus(Status.RUNNING);
1084 persistence.updateInDatabase(workflowInstance);
1085 serviceRegistry.updateJob(workflowJob);
1086
1087 Job operationJob = serviceRegistry.getJob(operationJobId);
1088 operationJob.setStatus(Status.QUEUED);
1089 operationJob.setDispatchable(true);
1090 if (properties != null) {
1091 Properties props = new Properties();
1092 props.putAll(properties);
1093 ByteArrayOutputStream out = new ByteArrayOutputStream();
1094 props.store(out, null);
1095 List<String> newArguments = new ArrayList<String>(operationJob.getArguments());
1096 newArguments.add(new String(out.toByteArray(), StandardCharsets.UTF_8));
1097 operationJob.setArguments(newArguments);
1098 }
1099 serviceRegistry.updateJob(operationJob);
1100 } catch (ServiceRegistryException e) {
1101 throw new WorkflowDatabaseException(e);
1102 } catch (IOException e) {
1103 throw new WorkflowParsingException("Unable to parse workflow and/or workflow properties");
1104 }
1105
1106 return workflowInstance;
1107 }
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119 protected void assertPermission(WorkflowInstance workflow, String action, String workflowOrgId) throws UnauthorizedException {
1120 User currentUser = securityService.getUser();
1121 Organization currentOrg = securityService.getOrganization();
1122 String currentOrgAdminRole = currentOrg.getAdminRole();
1123 String currentOrgId = currentOrg.getId();
1124
1125 MediaPackage mediapackage = workflow.getMediaPackage();
1126
1127 WorkflowState state = workflow.getState();
1128 if (state != INSTANTIATED && state != RUNNING && workflow.getState() != FAILING) {
1129 Optional<MediaPackage> assetMediapackage = assetManager.getMediaPackage(mediapackage.getIdentifier().toString());
1130 if (assetMediapackage.isPresent()) {
1131 mediapackage = assetMediapackage.get();
1132 }
1133 }
1134
1135 var creatorName = workflow.getCreatorName();
1136 var workflowCreator = creatorName == null ? null : userDirectoryService.loadUser(creatorName);
1137 boolean authorized = currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1138 || (currentUser.hasRole(currentOrgAdminRole) && currentOrgId.equals(workflowOrgId))
1139 || (currentUser.equals(workflowCreator))
1140 || (authorizationService.hasPermission(mediapackage, action) && currentOrgId.equals(workflowOrgId));
1141
1142 if (!authorized) {
1143 throw new UnauthorizedException(currentUser, action);
1144 }
1145 }
1146
1147 protected boolean assertMediaPackagePermission(String mediaPackageId, String action) {
1148 var currentUser = securityService.getUser();
1149 Optional<MediaPackage> mp = assetManager.getMediaPackage(mediaPackageId);
1150
1151
1152
1153
1154 return currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1155 || mp.isPresent() && currentUser.hasRole(securityService.getOrganization().getAdminRole())
1156 || mp.isPresent() && authorizationService.hasPermission(mp.get(), action);
1157 }
1158
1159
1160
1161
1162
1163
1164 @Override
1165 public void update(final WorkflowInstance workflowInstance) throws WorkflowDatabaseException, UnauthorizedException {
1166 final Lock lock = updateLock.get(workflowInstance.getId());
1167 lock.lock();
1168
1169 try {
1170 WorkflowInstance originalWorkflowInstance = null;
1171 try {
1172
1173 originalWorkflowInstance = getWorkflowById(workflowInstance.getId());
1174 } catch (NotFoundException e) {
1175
1176 }
1177
1178 MediaPackage updatedMediaPackage = null;
1179 try {
1180
1181
1182 updatedMediaPackage = workflowInstance.getMediaPackage();
1183
1184 populateMediaPackageMetadata(updatedMediaPackage);
1185
1186 String seriesId = updatedMediaPackage.getSeries();
1187 if (seriesId != null && workflowInstance.getCurrentOperation() != null) {
1188
1189
1190
1191 try {
1192 AccessControlList acl = seriesService.getSeriesAccessControl(seriesId);
1193 Tuple<AccessControlList, AclScope> activeAcl = authorizationService.getAcl(
1194 updatedMediaPackage, AclScope.Series);
1195
1196 if (!AclScope.Series.equals(activeAcl.getB()) || !AccessControlUtil.equals(activeAcl.getA(), acl)) {
1197 authorizationService.setAcl(updatedMediaPackage, AclScope.Series, acl);
1198 }
1199 } catch (NotFoundException e) {
1200 logger.debug("Not updating series ACL on event {} since series {} has no ACL set",
1201 updatedMediaPackage, seriesId, e);
1202 }
1203 }
1204
1205 workflowInstance.setMediaPackage(updatedMediaPackage);
1206 } catch (SeriesException e) {
1207 throw new WorkflowDatabaseException(e);
1208 } catch (Exception e) {
1209 logger.error("Metadata for media package {} could not be updated", updatedMediaPackage, e);
1210 }
1211
1212
1213 WorkflowState workflowState = workflowInstance.getState();
1214
1215 Job job;
1216 try {
1217 job = serviceRegistry.getJob(workflowInstance.getId());
1218 job.setPayload(Long.toString(workflowInstance.getId()));
1219
1220
1221 switch (workflowState) {
1222 case FAILED:
1223 job.setStatus(Status.FAILED);
1224 break;
1225 case FAILING:
1226 break;
1227 case INSTANTIATED:
1228 job.setDispatchable(true);
1229 job.setStatus(Status.QUEUED);
1230 break;
1231 case PAUSED:
1232 job.setStatus(Status.PAUSED);
1233 break;
1234 case RUNNING:
1235 job.setStatus(Status.RUNNING);
1236 break;
1237 case STOPPED:
1238 job.setStatus(Status.CANCELLED);
1239 break;
1240 case SUCCEEDED:
1241 job.setStatus(Status.FINISHED);
1242 break;
1243 default:
1244 throw new IllegalStateException("Found a workflow state that is not handled");
1245 }
1246 } catch (ServiceRegistryException e) {
1247 throw new WorkflowDatabaseException(
1248 "Unable to read workflow job " + workflowInstance.getId() + " from service registry", e);
1249 } catch (NotFoundException e) {
1250 throw new WorkflowDatabaseException(
1251 "Job for workflow " + workflowInstance.getId() + " not found in service registry", e);
1252 }
1253
1254
1255 try {
1256
1257 persistence.updateInDatabase(workflowInstance);
1258
1259 job = serviceRegistry.updateJob(job);
1260
1261 WorkflowOperationInstance op = workflowInstance.getCurrentOperation();
1262
1263
1264
1265
1266 if (op == null || op.getState() != OperationState.RUNNING) {
1267
1268 long id = workflowInstance.getId();
1269 int state = workflowInstance.getState().ordinal();
1270 var template = workflowInstance.getTemplate();
1271 String mpId = workflowInstance.getMediaPackage().getIdentifier().toString();
1272 String orgId = workflowInstance.getOrganizationId();
1273
1274 updateWorkflowInstanceInIndex(id, state, template, mpId, orgId);
1275 }
1276 } catch (ServiceRegistryException e) {
1277 throw new WorkflowDatabaseException("Update of workflow job " + workflowInstance.getId()
1278 + " in the service registry failed, service registry and workflow table may be out of sync", e);
1279 } catch (NotFoundException e) {
1280 throw new WorkflowDatabaseException("Job for workflow " + workflowInstance.getId()
1281 + " not found in service registry", e);
1282 } catch (Exception e) {
1283 throw new WorkflowDatabaseException("Update of workflow job " + job.getId() + " in the service registry failed, "
1284 + "service registry and workflow table may be out of sync", e);
1285 }
1286
1287 try {
1288 fireListeners(originalWorkflowInstance, workflowInstance);
1289 } catch (Exception e) {
1290
1291 throw new IllegalStateException("In-memory workflow instance could not be serialized", e);
1292 }
1293 } finally {
1294 lock.unlock();
1295 }
1296 }
1297
1298
1299
1300
1301
1302
1303 @Override
1304 public long countWorkflowInstances() throws WorkflowDatabaseException {
1305 return countWorkflowInstances(null);
1306 }
1307
1308
1309
1310
1311
1312
1313 @Override
1314 public long countWorkflowInstances(WorkflowState state) throws WorkflowDatabaseException {
1315 return persistence.countWorkflows(state);
1316 }
1317
1318
1319
1320
1321
1322
1323 @Override
1324 public List<WorkflowInstance> getWorkflowInstancesByMediaPackage(String mediaPackageId)
1325 throws WorkflowDatabaseException, UnauthorizedException {
1326
1327 if (!assertMediaPackagePermission(mediaPackageId, Permissions.Action.READ.toString())) {
1328 throw new UnauthorizedException("Not allowed to access event");
1329 }
1330 return persistence.getWorkflowInstancesByMediaPackage(mediaPackageId);
1331 }
1332
1333
1334
1335
1336
1337
1338 @Override
1339 public Optional<WorkflowInstance> getRunningWorkflowInstanceByMediaPackage(String mediaPackageId, String action)
1340 throws WorkflowException, UnauthorizedException, WorkflowDatabaseException {
1341 List<WorkflowInstance> workflowInstances = persistence.getRunningWorkflowInstancesByMediaPackage(mediaPackageId);
1342
1343
1344 if (workflowInstances.size() > 1) {
1345 throw new WorkflowException("Multiple workflows are active on mediapackage " + mediaPackageId);
1346 }
1347
1348 Optional<WorkflowInstance> optWorkflowInstance = Optional.empty();
1349 if (workflowInstances.size() == 1) {
1350 WorkflowInstance wfInstance = workflowInstances.get(0);
1351 optWorkflowInstance = Optional.of(wfInstance);
1352 assertPermission(wfInstance, action, wfInstance.getOrganizationId());
1353 }
1354
1355 return optWorkflowInstance;
1356 }
1357
1358
1359
1360
1361
1362
1363 @Override
1364 public boolean mediaPackageHasActiveWorkflows(String mediaPackageId) throws WorkflowDatabaseException {
1365 return persistence.mediaPackageHasActiveWorkflows(mediaPackageId);
1366 }
1367
1368
1369
1370
1371
1372
1373 @Override
1374 public boolean userHasActiveWorkflows(String userId) throws WorkflowDatabaseException {
1375 return persistence.userHasActiveWorkflows(userId);
1376 }
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388 protected WorkflowInstance handleOperationException(
1389 WorkflowInstance workflow,
1390 WorkflowOperationInstance currentOperation) {
1391 int failedAttempt = currentOperation.getFailedAttempts() + 1;
1392 currentOperation.setFailedAttempts(failedAttempt);
1393
1394
1395 if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate())
1396 && OperationState.FAILED.equals(currentOperation.getState())) {
1397 int position = workflow.getOperations().indexOf(currentOperation);
1398
1399 if (workflow.getOperations().size() > position + 1) {
1400 currentOperation = workflow.getOperations().get(position + 1);
1401
1402 currentOperation.setState(OperationState.FAILED);
1403 }
1404 handleFailedOperation(workflow, currentOperation);
1405 } else if (currentOperation.getMaxAttempts() != -1 && failedAttempt == currentOperation.getMaxAttempts()) {
1406 handleFailedOperation(workflow, currentOperation);
1407 } else {
1408 switch (currentOperation.getRetryStrategy()) {
1409 case NONE:
1410 handleFailedOperation(workflow, currentOperation);
1411 break;
1412 case RETRY:
1413 currentOperation.setState(OperationState.RETRY);
1414 break;
1415 case HOLD:
1416 currentOperation.setState(OperationState.RETRY);
1417 List<WorkflowOperationInstance> operations = workflow.getOperations();
1418 WorkflowOperationDefinitionImpl errorResolutionDefinition = new WorkflowOperationDefinitionImpl(
1419 ERROR_RESOLUTION_HANDLER_ID, "Error Resolution Operation", "error", false);
1420 var errorResolutionInstance = new WorkflowOperationInstance(errorResolutionDefinition);
1421 errorResolutionInstance.setExceptionHandlingWorkflow(currentOperation.getExceptionHandlingWorkflow());
1422 var index = workflow.getOperations().indexOf(currentOperation);
1423 operations.add(index, errorResolutionInstance);
1424 workflow.setOperations(operations);
1425 break;
1426 default:
1427 break;
1428 }
1429 }
1430 return workflow;
1431 }
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441 private void handleFailedOperation(WorkflowInstance workflow, WorkflowOperationInstance currentOperation) {
1442 String errorDefId = currentOperation.getExceptionHandlingWorkflow();
1443
1444
1445 if (currentOperation.isFailOnError()) {
1446 if (StringUtils.isBlank(errorDefId)) {
1447 workflow.setState(FAILED);
1448 } else {
1449 workflow.setState(FAILING);
1450
1451
1452 int currentOperationPosition = workflow.getOperations().indexOf(currentOperation);
1453 List<WorkflowOperationInstance> operations = new ArrayList<>(
1454 workflow.getOperations().subList(0, currentOperationPosition + 1));
1455 workflow.setOperations(operations);
1456
1457
1458 Map<String, String> configuration = new HashMap<>();
1459 for (String configKey : workflow.getConfigurationKeys()) {
1460 configuration.put(configKey, workflow.getConfiguration(configKey));
1461 }
1462
1463
1464 WorkflowDefinition errorDef = null;
1465 try {
1466 errorDef = getWorkflowDefinitionById(errorDefId);
1467 workflow.extend(errorDef);
1468 workflow.setOperations(updateConfiguration(workflow, configuration).getOperations());
1469 } catch (NotFoundException notFoundException) {
1470 throw new IllegalStateException("Unable to find the error workflow definition '" + errorDefId + "'");
1471 }
1472 }
1473 }
1474
1475
1476 currentOperation.setState(OperationState.FAILED);
1477 }
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491 protected WorkflowInstance handleOperationResult(WorkflowInstance workflow, WorkflowOperationResult result)
1492 throws WorkflowDatabaseException {
1493
1494
1495 WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
1496 WorkflowOperationHandler handler = getWorkflowOperationHandler(currentOperation.getTemplate());
1497
1498
1499 if (result == null) {
1500 logger.warn("Handling a null operation result for workflow {} in operation {}", workflow.getId(),
1501 currentOperation.getTemplate());
1502 result = new WorkflowOperationResultImpl(workflow.getMediaPackage(), null, Action.CONTINUE, 0);
1503 } else {
1504 MediaPackage mp = result.getMediaPackage();
1505 if (mp != null) {
1506 workflow.setMediaPackage(mp);
1507 }
1508 }
1509
1510
1511 Action action = result.getAction();
1512
1513
1514 workflow = updateConfiguration(workflow, result.getProperties());
1515
1516
1517 currentOperation.setTimeInQueue(result.getTimeInQueue());
1518
1519
1520 switch (action) {
1521 case CONTINUE:
1522 currentOperation.setState(OperationState.SUCCEEDED);
1523 break;
1524 case PAUSE:
1525 if (!(handler instanceof ResumableWorkflowOperationHandler)) {
1526 throw new IllegalStateException("Operation " + currentOperation.getTemplate() + " is not resumable");
1527 }
1528
1529
1530 currentOperation.setContinuable(result.allowsContinue());
1531 currentOperation.setAbortable(result.allowsAbort());
1532
1533 workflow.setState(PAUSED);
1534 currentOperation.setState(OperationState.PAUSED);
1535 break;
1536 case SKIP:
1537 currentOperation.setState(OperationState.SKIPPED);
1538 break;
1539 default:
1540 throw new IllegalStateException("Unknown action '" + action + "' returned");
1541 }
1542
1543 if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate()) && result.getAction() == Action.CONTINUE) {
1544
1545 Map<String, String> resultProperties = result.getProperties();
1546 if (resultProperties == null || StringUtils.isBlank(resultProperties.get(RETRY_STRATEGY)))
1547 throw new WorkflowDatabaseException("Retry strategy not present in properties!");
1548
1549 RetryStrategy retryStrategy = RetryStrategy.valueOf(resultProperties.get(RETRY_STRATEGY));
1550 switch (retryStrategy) {
1551 case NONE:
1552 handleFailedOperation(workflow, workflow.getCurrentOperation());
1553 break;
1554 case RETRY:
1555 break;
1556 default:
1557 throw new WorkflowDatabaseException("Retry strategy not implemented yet!");
1558 }
1559 }
1560
1561 return workflow;
1562 }
1563
1564
1565
1566
1567
1568
1569
1570 protected void populateMediaPackageMetadata(MediaPackage mp) {
1571 if (metadataServices.isEmpty()) {
1572 logger.warn("No metadata services are registered, so no media package metadata can be extracted from catalogs");
1573 return;
1574 }
1575 for (MediaPackageMetadataService metadataService : metadataServices) {
1576 MediaPackageMetadata metadata = metadataService.getMetadata(mp);
1577 MediaPackageMetadataSupport.populateMediaPackageMetadata(mp, metadata);
1578 }
1579 }
1580
1581
1582
1583
1584
1585
1586 @Override
1587 public boolean isReadyToAcceptJobs(String operation) {
1588 return true;
1589 }
1590
1591
1592
1593
1594
1595
1596
1597
1598 @Override
1599 public boolean isReadyToAccept(Job job) throws UndispatchableJobException {
1600 String operation = job.getOperation();
1601
1602
1603 if (!Operation.START_WORKFLOW.toString().equals(operation))
1604 return true;
1605
1606
1607 if (job.getArguments().size() > 1 && job.getArguments().get(0) != null) {
1608 try {
1609 WorkflowDefinition workflowDef = XmlWorkflowParser.parseWorkflowDefinition(job.getArguments().get(0));
1610 if (workflowDef.getOperations().size() > 0) {
1611 String firstOperationId = workflowDef.getOperations().get(0).getId();
1612 WorkflowOperationHandler handler = getWorkflowOperationHandler(firstOperationId);
1613 if (handler instanceof ResumableWorkflowOperationHandler) {
1614 if (((ResumableWorkflowOperationHandler) handler).isAlwaysPause()) {
1615 return true;
1616 }
1617 }
1618 }
1619 } catch (WorkflowParsingException e) {
1620 throw new UndispatchableJobException(job + " is not a proper job to start a workflow", e);
1621 }
1622 }
1623
1624 WorkflowInstance workflow;
1625 Optional<WorkflowInstance> workflowInstance;
1626 String mediaPackageId;
1627
1628
1629 try {
1630 workflow = getWorkflowById(job.getId());
1631 mediaPackageId = workflow.getMediaPackage().getIdentifier().toString();
1632 } catch (NotFoundException e) {
1633 throw new UndispatchableJobException("Trying to start workflow with job id " + job.getId()
1634 + " but no corresponding instance is available from the workflow service", e);
1635 } catch (UnauthorizedException e) {
1636 throw new UndispatchableJobException(
1637 "Authorization denied while requesting to loading workflow instance. Job: " + job.getId(), e);
1638 }
1639
1640 try {
1641 workflowInstance = getRunningWorkflowInstanceByMediaPackage(
1642 workflow.getMediaPackage().getIdentifier().toString(), Permissions.Action.READ.toString());
1643 } catch (UnauthorizedException e) {
1644 throw new UndispatchableJobException("Authorization denied while requesting to loading workflow instance " + workflow.getId(), e);
1645 } catch (WorkflowDatabaseException e) {
1646 throw new UndispatchableJobException("An database error occurred while checking if a workflow is already active "
1647 + "(job: " + job.getId() + ")", e);
1648 } catch (WorkflowException e) {
1649
1650 delayWorkflow(workflow, mediaPackageId);
1651 return false;
1652 }
1653
1654
1655 if (workflowInstance.isPresent() && workflow.getId() != workflowInstance.get().getId()) {
1656 delayWorkflow(workflow, mediaPackageId);
1657 return false;
1658 }
1659
1660 return true;
1661 }
1662
1663 private void delayWorkflow(WorkflowInstance workflow, String mediaPackageId) {
1664 if (!delayedWorkflows.contains(workflow.getId())) {
1665 logger.info("Delaying start of workflow {}, another workflow on media package {} is still running",
1666 workflow.getId(), mediaPackageId);
1667 delayedWorkflows.add(workflow.getId());
1668 }
1669 }
1670
1671
1672
1673
1674
1675
1676 @Override
1677 public synchronized void acceptJob(Job job) throws ServiceRegistryException {
1678 User originalUser = securityService.getUser();
1679 Organization originalOrg = securityService.getOrganization();
1680 try {
1681 Organization organization = organizationDirectoryService.getOrganization(job.getOrganization());
1682 securityService.setOrganization(organization);
1683 User user = userDirectoryService.loadUser(job.getCreator());
1684 securityService.setUser(user);
1685 job.setStatus(Job.Status.RUNNING);
1686 job = serviceRegistry.updateJob(job);
1687
1688
1689 if (delayedWorkflows.contains(job.getId())) {
1690 delayedWorkflows.remove(job.getId());
1691 logger.info("Starting initially delayed workflow {}, {} more waiting", job.getId(), delayedWorkflows.size());
1692 }
1693
1694 executorService.submit(new JobRunner(job, serviceRegistry.getCurrentJob()));
1695 } catch (Exception e) {
1696 if (e instanceof ServiceRegistryException)
1697 throw (ServiceRegistryException) e;
1698 throw new ServiceRegistryException(e);
1699 } finally {
1700 securityService.setUser(originalUser);
1701 securityService.setOrganization(originalOrg);
1702 }
1703 }
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714 protected String process(Job job) throws Exception {
1715 List<String> arguments = job.getArguments();
1716 Operation op = null;
1717 WorkflowInstance workflowInstance = null;
1718 WorkflowOperationInstance wfo;
1719 String operation = job.getOperation();
1720 try {
1721 try {
1722 op = Operation.valueOf(operation);
1723 switch (op) {
1724 case START_WORKFLOW:
1725 workflowInstance = persistence.getWorkflow(Long.parseLong(job.getPayload()));
1726 logger.debug("Starting new workflow {}", workflowInstance);
1727 runWorkflow(workflowInstance);
1728 break;
1729 case RESUME:
1730 workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1731 Map<String, String> properties = null;
1732 if (arguments.size() > 1) {
1733 Properties props = new Properties();
1734 props.load(IOUtils.toInputStream(arguments.get(arguments.size() - 1), StandardCharsets.UTF_8));
1735 properties = new HashMap<>();
1736 for (Entry<Object, Object> entry : props.entrySet()) {
1737 properties.put(entry.getKey().toString(), entry.getValue().toString());
1738 }
1739 }
1740 logger.debug("Resuming {} at {}", workflowInstance, workflowInstance.getCurrentOperation());
1741 workflowInstance.setState(RUNNING);
1742 update(workflowInstance);
1743 runWorkflowOperation(workflowInstance, properties);
1744 break;
1745 case START_OPERATION:
1746 workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1747 wfo = workflowInstance.getCurrentOperation();
1748
1749 if (OperationState.RUNNING.equals(wfo.getState()) || OperationState.PAUSED.equals(wfo.getState())) {
1750 logger.info("Reset operation state {} {} to INSTANTIATED due to job restart", workflowInstance, wfo);
1751 wfo.setState(OperationState.INSTANTIATED);
1752 }
1753
1754 wfo.setExecutionHost(job.getProcessingHost());
1755 logger.debug("Running {} {}", workflowInstance, wfo);
1756 wfo = runWorkflowOperation(workflowInstance, null);
1757 updateOperationJob(job.getId(), wfo.getState());
1758 break;
1759 default:
1760 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
1761 }
1762 } catch (IllegalArgumentException e) {
1763 throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
1764 } catch (IndexOutOfBoundsException e) {
1765 throw new ServiceRegistryException(
1766 "The argument list for operation '" + op + "' (job: " + job.getId() + ") does not meet expectations", e);
1767 } catch (NotFoundException e) {
1768 logger.warn("Not found processing job {}", job, e);
1769 updateOperationJob(job.getId(), OperationState.FAILED);
1770 }
1771 return null;
1772 } catch (Exception e) {
1773 logger.warn("Exception while accepting job {}", job, e);
1774 try {
1775 if (workflowInstance != null) {
1776 logger.warn("Marking job {} and workflow instance {} as failed", job, workflowInstance);
1777 updateOperationJob(job.getId(), OperationState.FAILED);
1778 workflowInstance.setState(FAILED);
1779 update(workflowInstance);
1780 } else {
1781 logger.warn("Unable to parse workflow instance", e);
1782 }
1783 } catch (WorkflowDatabaseException e1) {
1784 throw new ServiceRegistryException(e1);
1785 }
1786 if (e instanceof ServiceRegistryException)
1787 throw e;
1788 throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
1789 }
1790 }
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806 private Job updateOperationJob(Long jobId, OperationState state) throws NotFoundException, ServiceRegistryException {
1807 if (jobId == null)
1808 return null;
1809 Job job = serviceRegistry.getJob(jobId);
1810 switch (state) {
1811 case FAILED:
1812 case RETRY:
1813 job.setStatus(Status.FAILED);
1814 break;
1815 case PAUSED:
1816 job.setStatus(Status.PAUSED);
1817 job.setOperation(Operation.RESUME.toString());
1818 break;
1819 case SKIPPED:
1820 case SUCCEEDED:
1821 job.setStatus(Status.FINISHED);
1822 break;
1823 default:
1824 throw new IllegalStateException("Unexpected state '" + state + "' found");
1825 }
1826 return serviceRegistry.updateJob(job);
1827 }
1828
1829
1830
1831
1832
1833
1834 @Override
1835 public long countJobs(Status status) throws ServiceRegistryException {
1836 return serviceRegistry.count(JOB_TYPE, status);
1837 }
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847 private String mapToString(Map<String, String> props) {
1848 if (props == null)
1849 return null;
1850 StringBuilder sb = new StringBuilder();
1851 for (Entry<String, String> entry : props.entrySet()) {
1852 sb.append(entry.getKey());
1853 sb.append("=");
1854 sb.append(entry.getValue());
1855 sb.append("\n");
1856 }
1857 return sb.toString();
1858 }
1859
1860
1861
1862
1863
1864
1865
1866 @Reference(target = "(artifact=workflowdefinition)")
1867 protected void setProfilesReadyIndicator(ReadinessIndicator unused) { }
1868
1869
1870
1871
1872
1873
1874
1875 @Reference
1876 protected void setWorkspace(Workspace workspace) {
1877 this.workspace = workspace;
1878 }
1879
1880
1881
1882
1883
1884
1885
1886 @Reference
1887 protected void setServiceRegistry(ServiceRegistry registry) {
1888 this.serviceRegistry = registry;
1889 }
1890
1891 public ServiceRegistry getServiceRegistry() {
1892 return serviceRegistry;
1893 }
1894
1895
1896
1897
1898
1899
1900
1901 @Reference
1902 public void setSecurityService(SecurityService securityService) {
1903 this.securityService = securityService;
1904 }
1905
1906
1907
1908
1909
1910
1911
1912 @Reference
1913 public void setAuthorizationService(AuthorizationService authorizationService) {
1914 this.authorizationService = authorizationService;
1915 }
1916
1917
1918
1919
1920
1921
1922
1923 @Reference
1924 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1925 this.userDirectoryService = userDirectoryService;
1926 }
1927
1928
1929
1930
1931
1932
1933
1934 @Reference
1935 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
1936 this.organizationDirectoryService = organizationDirectory;
1937 }
1938
1939
1940
1941
1942
1943
1944
1945 @Reference
1946 public void setSeriesService(SeriesService seriesService) {
1947 this.seriesService = seriesService;
1948 }
1949
1950
1951
1952
1953
1954
1955
1956 @Reference
1957 public void setAssetManager(AssetManager assetManager) {
1958 this.assetManager = assetManager;
1959 }
1960
1961
1962
1963
1964
1965
1966
1967 @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC, unbind = "removeMetadataService")
1968 protected void addMetadataService(MediaPackageMetadataService service) {
1969 metadataServices.add(service);
1970 }
1971
1972
1973
1974
1975
1976
1977
1978 protected void removeMetadataService(MediaPackageMetadataService service) {
1979 metadataServices.remove(service);
1980 }
1981
1982
1983
1984
1985
1986
1987
1988 @Reference
1989 protected void addWorkflowDefinitionScanner(WorkflowDefinitionScanner scanner) {
1990 workflowDefinitionScanner = scanner;
1991 }
1992
1993
1994
1995
1996
1997
1998
1999 @Reference
2000 public void setIndex(ElasticsearchIndex index) {
2001 this.index = index;
2002 }
2003
2004
2005
2006
2007
2008
2009
2010 @Reference(name = "workflow-persistence")
2011 public void setPersistence(WorkflowServiceDatabase persistence) {
2012 this.persistence = persistence;
2013 }
2014
2015
2016
2017
2018
2019
2020 @Override
2021 public String getJobType() {
2022 return JOB_TYPE;
2023 }
2024
2025
2026
2027
2028 public static class HandlerRegistration {
2029
2030 protected WorkflowOperationHandler handler;
2031 protected String operationName;
2032
2033 public HandlerRegistration(String operationName, WorkflowOperationHandler handler) {
2034 if (operationName == null)
2035 throw new IllegalArgumentException("Operation name cannot be null");
2036 if (handler == null)
2037 throw new IllegalArgumentException("Handler cannot be null");
2038 this.operationName = operationName;
2039 this.handler = handler;
2040 }
2041
2042 public WorkflowOperationHandler getHandler() {
2043 return handler;
2044 }
2045
2046
2047
2048
2049
2050
2051 @Override
2052 public int hashCode() {
2053 final int prime = 31;
2054 int result = 1;
2055 result = prime * result + handler.hashCode();
2056 result = prime * result + operationName.hashCode();
2057 return result;
2058 }
2059
2060
2061
2062
2063
2064
2065 @Override
2066 public boolean equals(Object obj) {
2067 if (this == obj)
2068 return true;
2069 if (obj == null)
2070 return false;
2071 if (getClass() != obj.getClass())
2072 return false;
2073 HandlerRegistration other = (HandlerRegistration) obj;
2074 if (!handler.equals(other.handler))
2075 return false;
2076 return operationName.equals(other.operationName);
2077 }
2078 }
2079
2080
2081
2082
2083 class JobRunner implements Callable<Void> {
2084
2085
2086 private Job job = null;
2087
2088
2089 private final Job currentJob;
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099 JobRunner(Job job, Job currentJob) {
2100 this.job = job;
2101 this.currentJob = currentJob;
2102 }
2103
2104
2105
2106
2107
2108
2109 @Override
2110 public Void call() throws Exception {
2111 Organization jobOrganization = organizationDirectoryService.getOrganization(job.getOrganization());
2112 try {
2113 serviceRegistry.setCurrentJob(currentJob);
2114 securityService.setOrganization(jobOrganization);
2115 User jobUser = userDirectoryService.loadUser(job.getCreator());
2116 securityService.setUser(jobUser);
2117 process(job);
2118 } finally {
2119 serviceRegistry.setCurrentJob(null);
2120 securityService.setUser(null);
2121 securityService.setOrganization(null);
2122 }
2123 return null;
2124 }
2125 }
2126
2127 @Override
2128 public synchronized void cleanupWorkflowInstances(int buffer, WorkflowState state) throws UnauthorizedException,
2129 WorkflowDatabaseException {
2130 logger.info("Start cleaning up workflow instances older than {} days with status '{}'", buffer, state);
2131
2132 int instancesCleaned = 0;
2133 int cleaningFailed = 0;
2134
2135 Date priorTo = DateUtils.addDays(new Date(), -buffer);
2136
2137 try {
2138 for (WorkflowInstance workflowInstance : persistence.getWorkflowInstancesForCleanup(state, priorTo)) {
2139 try {
2140 logger.debug("Deleting workflow instance {}", workflowInstance.getId());
2141 remove(workflowInstance.getId());
2142 instancesCleaned++;
2143 } catch (WorkflowDatabaseException | UnauthorizedException e) {
2144 throw e;
2145 } catch (NotFoundException e) {
2146
2147 logger.debug("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2148 } catch (WorkflowParsingException | WorkflowStateException e) {
2149 logger.warn("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2150 cleaningFailed++;
2151 }
2152 }
2153 } catch (WorkflowDatabaseException e) {
2154 throw new WorkflowDatabaseException(e);
2155 }
2156
2157 if (instancesCleaned == 0 && cleaningFailed == 0) {
2158 logger.info("No workflow instances found to clean up");
2159 return;
2160 }
2161
2162 if (instancesCleaned > 0)
2163 logger.info("Cleaned up '{}' workflow instances", instancesCleaned);
2164 if (cleaningFailed > 0) {
2165 logger.warn("Cleaning failed for '{}' workflow instances", cleaningFailed);
2166 throw new WorkflowDatabaseException("Unable to clean all workflow instances, see logs!");
2167 }
2168 }
2169
2170 @Override
2171 public Map<String, Map<String, String>> getWorkflowStateMappings() {
2172 return workflowDefinitionScanner.workflowStateMappings.entrySet().stream().collect(Collectors.toMap(
2173 Entry::getKey, e -> e.getValue().stream()
2174 .collect(Collectors.toMap(m -> m.getState().name(), WorkflowStateMapping::getValue))
2175 ));
2176 }
2177
2178 @Override
2179 public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
2180 try {
2181 final int total;
2182 try {
2183 total = persistence.countMediaPackages();
2184 } catch (WorkflowDatabaseException e) {
2185 logIndexRebuildError(logger, e);
2186 throw new IndexRebuildException(getService(), e);
2187 }
2188
2189 if (total > 0) {
2190 logIndexRebuildBegin(logger, total, "workflows");
2191 int current = 0;
2192 int n = 20;
2193 List<WorkflowIndexData> workflowIndexData;
2194
2195 int limit = 1000;
2196 int offset = 0;
2197 String currentMediapackageId;
2198 String lastMediapackageId = "";
2199 var updatedWorkflowRange = new ArrayList<Event>();
2200 do {
2201 try {
2202 workflowIndexData = persistence.getWorkflowIndexData(limit, offset);
2203 } catch (WorkflowDatabaseException e) {
2204 logIndexRebuildError(logger, e);
2205 throw new IndexRebuildException(getService(), e);
2206 }
2207 if (!workflowIndexData.isEmpty()) {
2208 offset += limit;
2209 logger.debug("Got {} workflows for re-indexing", workflowIndexData.size());
2210
2211 for (WorkflowIndexData indexData : workflowIndexData) {
2212 currentMediapackageId = indexData.getMediaPackageId();
2213 if (currentMediapackageId.equals(lastMediapackageId)) {
2214 continue;
2215 }
2216 current++;
2217
2218
2219 if (!WorkflowUtil.isActive(WorkflowInstance.WorkflowState.values()[indexData.getState()].toString())
2220 || WorkflowState.PAUSED == WorkflowInstance.WorkflowState.values()[indexData.getState()]) {
2221 String orgid = indexData.getOrganizationId();
2222 if (null == orgid) {
2223 String mpId = indexData.getMediaPackageId();
2224
2225 List<Snapshot> snapshots = assetManager.getSnapshotsById(mpId);
2226 if (snapshots.size() == 0) {
2227 logger.debug("Dropping {} from the index since it is missing from the database", mpId);
2228 continue;
2229 }
2230 orgid = snapshots.stream().findFirst().get().getOrganizationId();
2231
2232
2233 try {
2234
2235
2236 WorkflowInstance instance = persistence.getWorkflow(indexData.getId(), null);
2237 instance.setOrganizationId(orgid);
2238 persistence.updateInDatabase(instance);
2239 } catch (NotFoundException e) {
2240
2241 }
2242 }
2243 var updatedWorkflowData = index.getEvent(indexData.getMediaPackageId(), orgid, securityService.getUser());
2244 updatedWorkflowData = getStateUpdateFunction(indexData.getId(),indexData.getState(),
2245 indexData.getMediaPackageId(), indexData.getTemplate(), indexData.getOrganizationId())
2246 .apply(updatedWorkflowData);
2247 updatedWorkflowRange.add(updatedWorkflowData.get());
2248
2249 if (updatedWorkflowRange.size() >= n || current >= total) {
2250 index.bulkEventUpdate(updatedWorkflowRange);
2251 logIndexRebuildProgress(logger, total, current, n);
2252 updatedWorkflowRange.clear();
2253 }
2254 }
2255 else {
2256 logger.info("Skipping. Workflow {} is currently active.", indexData.getId());
2257 }
2258
2259 lastMediapackageId = currentMediapackageId;
2260 }
2261 }
2262 } while (!workflowIndexData.isEmpty());
2263 }
2264 } catch (Exception e) {
2265 logIndexRebuildError(logger, e);
2266 throw new IndexRebuildException(getService(), e);
2267 }
2268 }
2269
2270 @Override
2271 public IndexRebuildService.Service getService() {
2272 return IndexRebuildService.Service.Workflow;
2273 }
2274
2275
2276
2277
2278
2279
2280
2281 private void removeWorkflowInstanceFromIndex(long workflowInstanceId) {
2282 final String orgId = securityService.getOrganization().getId();
2283 final User user = securityService.getUser();
2284
2285
2286 SearchResult<Event> results;
2287 try {
2288 results = index.getByQuery(new EventSearchQuery(orgId, user).withWorkflowId(workflowInstanceId));
2289 } catch (SearchIndexException e) {
2290 logger.error("Error retrieving the events for workflow instance {} from the {} index.", workflowInstanceId,
2291 index.getIndexName(), e);
2292 return;
2293 }
2294
2295 if (results.getItems().length == 0) {
2296 logger.warn("No events for workflow instance {} found in the {} index.", workflowInstanceId,
2297 index.getIndexName());
2298 return;
2299 }
2300
2301
2302 for (SearchResultItem<Event> item: results.getItems()) {
2303 String eventId = item.getSource().getIdentifier();
2304 logger.debug("Removing workflow instance {} of event {} from the {} index.", workflowInstanceId, eventId,
2305 index.getIndexName());
2306
2307 Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
2308 if (!eventOpt.isPresent()) {
2309 logger.warn("Event {} of workflow instance {} not found in the {} index.", workflowInstanceId,
2310 eventId, index.getIndexName());
2311 return Optional.empty();
2312 }
2313 Event event = eventOpt.get();
2314 if (event.getWorkflowId() != null && event.getWorkflowId().equals(workflowInstanceId)) {
2315 logger.debug("Workflow {} is the current workflow of event {}. Removing it from event.", eventId,
2316 workflowInstanceId);
2317 event.setWorkflowId(null);
2318 event.setWorkflowDefinitionId(null);
2319 event.setWorkflowState(null);
2320 return Optional.of(event);
2321 }
2322 return Optional.empty();
2323 };
2324
2325 try {
2326 index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
2327 logger.debug("Workflow instance {} of event {} removed from the {} index.", workflowInstanceId, eventId,
2328 index.getIndexName());
2329 } catch (SearchIndexException e) {
2330 logger.error("Error removing the workflow instance {} of event {} from the {} index.", workflowInstanceId,
2331 eventId, index.getIndexName(), e);
2332 }
2333 }
2334 }
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348 private void updateWorkflowInstanceInIndex(long id, int state, String wfDefId, String mpId, String orgId) {
2349 final User user = securityService.getUser();
2350
2351 logger.debug("Updating workflow instance {} of event {} in the {} index.", id, mpId,
2352 index.getIndexName());
2353 Function<Optional<Event>, Optional<Event>> updateFunction = getStateUpdateFunction(id, state, wfDefId, mpId, orgId);
2354
2355 try {
2356 index.addOrUpdateEvent(mpId, updateFunction, orgId, user);
2357 logger.debug("Workflow instance {} of event {} updated in the {} index.", id, mpId,
2358 index.getIndexName());
2359 } catch (SearchIndexException e) {
2360 logger.error("Error updating the workflow instance {} of event {} in the {} index.", id, mpId,
2361 index.getIndexName(), e);
2362 }
2363 }
2364
2365
2366
2367
2368
2369
2370 private Function<Optional<Event>, Optional<Event>> getStateUpdateFunction(long workflowId,
2371 int workflowState, String workflowDefinitionId, String mediaPackageId,
2372 String orgId) {
2373 return (Optional<Event> eventOpt) -> {
2374 Event event = eventOpt.orElse(new Event(mediaPackageId, orgId));
2375 event.setWorkflowId(workflowId);
2376 event.setWorkflowState(WorkflowState.values()[workflowState]);
2377 event.setWorkflowDefinitionId(workflowDefinitionId);
2378 return Optional.of(event);
2379 };
2380 }
2381 }