1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.impl;
23
24 import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
25 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILED;
26 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILING;
27 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.INSTANTIATED;
28 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.PAUSED;
29 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.RUNNING;
30 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.STOPPED;
31 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.SUCCEEDED;
32
33 import org.opencastproject.assetmanager.api.AssetManager;
34 import org.opencastproject.assetmanager.api.query.RichAResult;
35 import org.opencastproject.assetmanager.util.WorkflowPropertiesUtil;
36 import org.opencastproject.elasticsearch.api.SearchIndexException;
37 import org.opencastproject.elasticsearch.api.SearchResult;
38 import org.opencastproject.elasticsearch.api.SearchResultItem;
39 import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
40 import org.opencastproject.elasticsearch.index.objects.event.Event;
41 import org.opencastproject.elasticsearch.index.objects.event.EventSearchQuery;
42 import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
43 import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
44 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
45 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
46 import org.opencastproject.job.api.Job;
47 import org.opencastproject.job.api.Job.Status;
48 import org.opencastproject.job.api.JobProducer;
49 import org.opencastproject.mediapackage.MediaPackage;
50 import org.opencastproject.mediapackage.MediaPackageElement;
51 import org.opencastproject.mediapackage.MediaPackageParser;
52 import org.opencastproject.mediapackage.MediaPackageSupport;
53 import org.opencastproject.mediapackage.Publication;
54 import org.opencastproject.metadata.api.MediaPackageMetadata;
55 import org.opencastproject.metadata.api.MediaPackageMetadataService;
56 import org.opencastproject.metadata.api.MetadataService;
57 import org.opencastproject.metadata.api.util.MediaPackageMetadataSupport;
58 import org.opencastproject.security.api.AccessControlList;
59 import org.opencastproject.security.api.AccessControlUtil;
60 import org.opencastproject.security.api.AclScope;
61 import org.opencastproject.security.api.AuthorizationService;
62 import org.opencastproject.security.api.Organization;
63 import org.opencastproject.security.api.OrganizationDirectoryService;
64 import org.opencastproject.security.api.Permissions;
65 import org.opencastproject.security.api.SecurityService;
66 import org.opencastproject.security.api.UnauthorizedException;
67 import org.opencastproject.security.api.User;
68 import org.opencastproject.security.api.UserDirectoryService;
69 import org.opencastproject.series.api.SeriesException;
70 import org.opencastproject.series.api.SeriesService;
71 import org.opencastproject.serviceregistry.api.ServiceRegistry;
72 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
73 import org.opencastproject.serviceregistry.api.UndispatchableJobException;
74 import org.opencastproject.util.NotFoundException;
75 import org.opencastproject.util.ReadinessIndicator;
76 import org.opencastproject.util.data.Tuple;
77 import org.opencastproject.workflow.api.ResumableWorkflowOperationHandler;
78 import org.opencastproject.workflow.api.RetryStrategy;
79 import org.opencastproject.workflow.api.WorkflowDatabaseException;
80 import org.opencastproject.workflow.api.WorkflowDefinition;
81 import org.opencastproject.workflow.api.WorkflowException;
82 import org.opencastproject.workflow.api.WorkflowIdentifier;
83 import org.opencastproject.workflow.api.WorkflowIndexData;
84 import org.opencastproject.workflow.api.WorkflowInstance;
85 import org.opencastproject.workflow.api.WorkflowInstance.WorkflowState;
86 import org.opencastproject.workflow.api.WorkflowListener;
87 import org.opencastproject.workflow.api.WorkflowOperationDefinition;
88 import org.opencastproject.workflow.api.WorkflowOperationDefinitionImpl;
89 import org.opencastproject.workflow.api.WorkflowOperationHandler;
90 import org.opencastproject.workflow.api.WorkflowOperationInstance;
91 import org.opencastproject.workflow.api.WorkflowOperationInstance.OperationState;
92 import org.opencastproject.workflow.api.WorkflowOperationResult;
93 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
94 import org.opencastproject.workflow.api.WorkflowOperationResultImpl;
95 import org.opencastproject.workflow.api.WorkflowParsingException;
96 import org.opencastproject.workflow.api.WorkflowService;
97 import org.opencastproject.workflow.api.WorkflowServiceDatabase;
98 import org.opencastproject.workflow.api.WorkflowStateException;
99 import org.opencastproject.workflow.api.WorkflowStateMapping;
100 import org.opencastproject.workflow.api.WorkflowUtil;
101 import org.opencastproject.workflow.api.XmlWorkflowParser;
102 import org.opencastproject.workspace.api.Workspace;
103
104 import com.google.common.util.concurrent.Striped;
105
106 import org.apache.commons.io.IOUtils;
107 import org.apache.commons.lang3.StringUtils;
108 import org.apache.commons.lang3.time.DateUtils;
109 import org.osgi.framework.InvalidSyntaxException;
110 import org.osgi.framework.ServiceReference;
111 import org.osgi.service.component.ComponentContext;
112 import org.osgi.service.component.annotations.Activate;
113 import org.osgi.service.component.annotations.Component;
114 import org.osgi.service.component.annotations.Reference;
115 import org.osgi.service.component.annotations.ReferenceCardinality;
116 import org.osgi.service.component.annotations.ReferencePolicy;
117 import org.slf4j.Logger;
118 import org.slf4j.LoggerFactory;
119
120 import java.io.ByteArrayOutputStream;
121 import java.io.IOException;
122 import java.nio.charset.StandardCharsets;
123 import java.util.ArrayList;
124 import java.util.Collections;
125 import java.util.Comparator;
126 import java.util.Date;
127 import java.util.HashMap;
128 import java.util.HashSet;
129 import java.util.List;
130 import java.util.Map;
131 import java.util.Map.Entry;
132 import java.util.Optional;
133 import java.util.Properties;
134 import java.util.Set;
135 import java.util.SortedSet;
136 import java.util.TreeSet;
137 import java.util.concurrent.Callable;
138 import java.util.concurrent.CopyOnWriteArrayList;
139 import java.util.concurrent.Executors;
140 import java.util.concurrent.ThreadPoolExecutor;
141 import java.util.concurrent.locks.Lock;
142 import java.util.function.Function;
143 import java.util.stream.Collectors;
144
145
146
147
148
149
150
151
152 @Component(
153 property = {
154 "service.description=Workflow Service",
155 "service.pid=org.opencastproject.workflow.impl.WorkflowServiceImpl"
156 },
157 immediate = true,
158 service = { WorkflowService.class, WorkflowServiceImpl.class, IndexProducer.class }
159 )
160 public class WorkflowServiceImpl extends AbstractIndexProducer implements WorkflowService, JobProducer {
161
162
163 private static final String RETRY_STRATEGY = "retryStrategy";
164
165
166 private static final Logger logger = LoggerFactory.getLogger(WorkflowServiceImpl.class);
167
168
169 enum Operation {
170 START_WORKFLOW, RESUME, START_OPERATION
171 }
172
173
174 private static final String NULL_PARENT_ID = "-";
175
176
177
178
179
180 private static final float WORKFLOW_JOB_LOAD = 0.0f;
181
182
183 public static final String ERROR_RESOLUTION_HANDLER_ID = "error-resolution";
184
185
186 protected ComponentContext componentContext = null;
187
188
189 private SortedSet<MediaPackageMetadataService> metadataServices;
190
191
192 protected WorkflowServiceDatabase persistence;
193
194
195 private final List<WorkflowListener> listeners = new CopyOnWriteArrayList<WorkflowListener>();
196
197
198 protected ThreadPoolExecutor executorService;
199
200
201 protected Workspace workspace = null;
202
203
204 protected ServiceRegistry serviceRegistry = null;
205
206
207 protected SecurityService securityService = null;
208
209
210 protected AuthorizationService authorizationService = null;
211
212
213 protected UserDirectoryService userDirectoryService = null;
214
215
216 protected OrganizationDirectoryService organizationDirectoryService = null;
217
218
219 protected SeriesService seriesService;
220
221
222 protected AssetManager assetManager = null;
223
224
225 private WorkflowDefinitionScanner workflowDefinitionScanner;
226
227
228 private final List<Long> delayedWorkflows = new ArrayList<Long>();
229
230
231 private final Striped<Lock> lock = Striped.lazyWeakLock(1024);
232 private final Striped<Lock> updateLock = Striped.lazyWeakLock(1024);
233 private final Striped<Lock> mediaPackageLocks = Striped.lazyWeakLock(1024);
234
235
236 private ElasticsearchIndex index;
237
238
239
240
241 public WorkflowServiceImpl() {
242 metadataServices = new TreeSet<>(Comparator.comparingInt(MetadataService::getPriority));
243 }
244
245
246
247
248
249
250
251 @Activate
252 public void activate(ComponentContext componentContext) {
253 this.componentContext = componentContext;
254 executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();
255 logger.info("Activate Workflow service");
256 }
257
258
259
260
261
262
263 @Override
264 public void addWorkflowListener(WorkflowListener listener) {
265 listeners.add(listener);
266 }
267
268
269
270
271
272
273 @Override
274 public void removeWorkflowListener(WorkflowListener listener) {
275 listeners.remove(listener);
276 }
277
278
279
280
281 protected void fireListeners(final WorkflowInstance oldWorkflowInstance, final WorkflowInstance newWorkflowInstance) {
282 final User currentUser = securityService.getUser();
283 final Organization currentOrganization = securityService.getOrganization();
284 for (final WorkflowListener listener : listeners) {
285 if (oldWorkflowInstance == null || !oldWorkflowInstance.getState().equals(newWorkflowInstance.getState())) {
286 Runnable runnable = () -> {
287 try {
288 securityService.setUser(currentUser);
289 securityService.setOrganization(currentOrganization);
290 listener.stateChanged(newWorkflowInstance);
291 } finally {
292 securityService.setUser(null);
293 securityService.setOrganization(null);
294 }
295 };
296 executorService.execute(runnable);
297 } else {
298 logger.debug("Not notifying {} because the workflow state has not changed", listener);
299 }
300
301 if (newWorkflowInstance.getCurrentOperation() != null) {
302 if (oldWorkflowInstance == null || oldWorkflowInstance.getCurrentOperation() == null
303 || !oldWorkflowInstance.getCurrentOperation().equals(newWorkflowInstance.getCurrentOperation())) {
304 Runnable runnable = () -> {
305 try {
306 securityService.setUser(currentUser);
307 securityService.setOrganization(currentOrganization);
308 listener.operationChanged(newWorkflowInstance);
309 } finally {
310 securityService.setUser(null);
311 securityService.setOrganization(null);
312 }
313 };
314 executorService.execute(runnable);
315 }
316 } else {
317 logger.debug("Not notifying {} because the workflow operation has not changed", listener);
318 }
319 }
320 }
321
322
323
324
325
326
327 @Override
328 public List<WorkflowDefinition> listAvailableWorkflowDefinitions() {
329 return workflowDefinitionScanner
330 .getAvailableWorkflowDefinitions(securityService.getOrganization(), securityService.getUser())
331 .sorted()
332 .collect(Collectors.toList());
333 }
334
335
336
337
338 public boolean isRunnable(WorkflowDefinition workflowDefinition) {
339 List<String> availableOperations = listAvailableOperationNames();
340 List<WorkflowDefinition> checkedWorkflows = new ArrayList<>();
341 boolean runnable = isRunnable(workflowDefinition, availableOperations, checkedWorkflows);
342 int wfCount = checkedWorkflows.size() - 1;
343 if (runnable)
344 logger.info("Workflow {}, containing {} derived workflows, is runnable", workflowDefinition, wfCount);
345 else
346 logger.warn("Workflow {}, containing {} derived workflows, is not runnable", workflowDefinition, wfCount);
347 return runnable;
348 }
349
350
351
352
353
354
355
356
357
358
359
360
361
362 private boolean isRunnable(WorkflowDefinition workflowDefinition, List<String> availableOperations,
363 List<WorkflowDefinition> checkedWorkflows) {
364 if (checkedWorkflows.contains(workflowDefinition))
365 return true;
366
367
368 for (WorkflowOperationDefinition op : workflowDefinition.getOperations()) {
369 if (!availableOperations.contains(op.getId())) {
370 logger.info("{} is not runnable due to missing operation {}", workflowDefinition, op);
371 return false;
372 }
373 String catchWorkflow = op.getExceptionHandlingWorkflow();
374 if (catchWorkflow != null) {
375 WorkflowDefinition catchWorkflowDefinition;
376 try {
377 catchWorkflowDefinition = getWorkflowDefinitionById(catchWorkflow);
378 } catch (NotFoundException e) {
379 logger.info("{} is not runnable due to missing catch workflow {} on operation {}", workflowDefinition,
380 catchWorkflow, op);
381 return false;
382 }
383 if (!isRunnable(catchWorkflowDefinition, availableOperations, checkedWorkflows))
384 return false;
385 }
386 }
387
388
389 if (!checkedWorkflows.contains(workflowDefinition))
390 checkedWorkflows.add(workflowDefinition);
391 return true;
392 }
393
394
395
396
397
398
399 public Set<HandlerRegistration> getRegisteredHandlers() {
400 Set<HandlerRegistration> set = new HashSet<>();
401 ServiceReference[] refs;
402 try {
403 refs = componentContext.getBundleContext().getServiceReferences(WorkflowOperationHandler.class.getName(), null);
404 } catch (InvalidSyntaxException e) {
405 throw new IllegalStateException(e);
406 }
407 if (refs != null) {
408 for (ServiceReference ref : refs) {
409 WorkflowOperationHandler handler = (WorkflowOperationHandler) componentContext.getBundleContext().getService(
410 ref);
411 set.add(new HandlerRegistration((String) ref.getProperty(WORKFLOW_OPERATION_PROPERTY), handler));
412 }
413 } else {
414 logger.warn("No registered workflow operation handlers found");
415 }
416 return set;
417 }
418
419 protected WorkflowOperationHandler getWorkflowOperationHandler(String operationId) {
420 for (HandlerRegistration reg : getRegisteredHandlers()) {
421 if (reg.operationName.equals(operationId))
422 return reg.handler;
423 }
424 return null;
425 }
426
427
428
429
430
431
432
433 protected List<String> listAvailableOperationNames() {
434 return getRegisteredHandlers().parallelStream().map(op -> op.operationName).collect(Collectors.toList());
435 }
436
437
438
439
440
441
442 @Override
443 public WorkflowInstance getWorkflowById(long id) throws NotFoundException,
444 UnauthorizedException {
445 try {
446 WorkflowInstance workflow = persistence.getWorkflow(id);
447 assertPermission(workflow, Permissions.Action.READ.toString(), workflow.getOrganizationId());
448 return workflow;
449 } catch (WorkflowDatabaseException e) {
450 throw new IllegalStateException("Got not get workflow from database with id ");
451 }
452 }
453
454
455
456
457
458
459
460 @Override
461 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage)
462 throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
463 return start(workflowDefinition, mediaPackage, new HashMap<>());
464 }
465
466
467
468
469
470
471
472 @Override
473 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage,
474 Map<String, String> properties)
475 throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
476 try {
477 return start(workflowDefinition, mediaPackage, null, properties);
478 } catch (NotFoundException e) {
479
480 throw new IllegalStateException("a null workflow ID caused a NotFoundException. This is a programming error.");
481 }
482 }
483
484
485
486
487
488
489
490 @Override
491 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage sourceMediaPackage,
492 Long parentWorkflowId, Map<String, String> originalProperties) throws WorkflowDatabaseException,
493 NotFoundException, UnauthorizedException, WorkflowParsingException, IllegalStateException {
494 final String mediaPackageId = sourceMediaPackage.getIdentifier().toString();
495 Map<String, String> properties = null;
496
497
498
499 populateMediaPackageMetadata(sourceMediaPackage);
500
501 if (originalProperties != null) {
502 WorkflowPropertiesUtil.storeProperties(assetManager, sourceMediaPackage, originalProperties);
503 properties = WorkflowPropertiesUtil.getLatestWorkflowProperties(assetManager, mediaPackageId);
504 }
505
506
507 final Lock lock = mediaPackageLocks.get(mediaPackageId);
508 lock.lock();
509
510 try {
511 if (workflowDefinition == null)
512 throw new IllegalArgumentException("workflow definition must not be null");
513 for (List<String> errors : MediaPackageSupport.sanityCheck(sourceMediaPackage)) {
514 throw new IllegalArgumentException("Insane media package cannot be processed: " + String.join("; ", errors));
515 }
516 if (parentWorkflowId != null) {
517 try {
518 getWorkflowById(parentWorkflowId);
519 } catch (UnauthorizedException e) {
520 throw new IllegalArgumentException("Parent workflow " + parentWorkflowId + " not visible to this user");
521 }
522 } else {
523 if (persistence.mediaPackageHasActiveWorkflows(mediaPackageId)) {
524 throw new IllegalStateException(String.format(
525 "Can't start workflow '%s' for media package '%s' because another workflow is currently active.",
526 workflowDefinition.getTitle(),
527 sourceMediaPackage.getIdentifier().toString()));
528 }
529 }
530
531
532 User currentUser = securityService.getUser();
533 validUserOrThrow(currentUser);
534
535
536 Organization organization = securityService.getOrganization();
537 if (organization == null)
538 throw new SecurityException("Current organization is unknown");
539
540 WorkflowInstance workflowInstance = new WorkflowInstance(workflowDefinition, sourceMediaPackage,
541 currentUser, organization, properties);
542 workflowInstance = updateConfiguration(workflowInstance, properties);
543
544
545 try {
546
547 String workflowDefinitionXml = XmlWorkflowParser.toXml(workflowDefinition);
548 String mediaPackageXml = MediaPackageParser.getAsXml(sourceMediaPackage);
549
550 List<String> arguments = new ArrayList<>();
551 arguments.add(workflowDefinitionXml);
552 arguments.add(mediaPackageXml);
553 if (parentWorkflowId != null || properties != null) {
554 String parentWorkflowIdString = (parentWorkflowId != null) ? parentWorkflowId.toString() : NULL_PARENT_ID;
555 arguments.add(parentWorkflowIdString);
556 }
557 if (properties != null) {
558 arguments.add(mapToString(properties));
559 }
560
561 Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_WORKFLOW.toString(), arguments,
562 null, false, null, WORKFLOW_JOB_LOAD);
563
564
565 workflowInstance.setId(job.getId());
566
567
568
569 update(workflowInstance);
570
571 return workflowInstance;
572 } catch (Throwable t) {
573 try {
574 workflowInstance.setState(FAILED);
575 update(workflowInstance);
576 } catch (Exception failureToFail) {
577 logger.warn("Unable to update workflow to failed state", failureToFail);
578 }
579 try {
580 throw t;
581 } catch (ServiceRegistryException e) {
582 throw new WorkflowDatabaseException(e);
583 }
584 }
585 } finally {
586 lock.unlock();
587 }
588 }
589
590 protected WorkflowInstance updateConfiguration(WorkflowInstance instance, Map<String, String> properties) {
591 if (properties != null) {
592 for (Entry<String, String> entry : properties.entrySet()) {
593 instance.setConfiguration(entry.getKey(), entry.getValue());
594 }
595 }
596 return instance;
597 }
598
599
600
601
602
603
604
605
606 protected WorkflowOperationHandler selectOperationHandler(WorkflowOperationInstance operation) {
607 List<WorkflowOperationHandler> handlerList = new ArrayList<>();
608 for (HandlerRegistration handlerReg : getRegisteredHandlers()) {
609 if (handlerReg.operationName != null && handlerReg.operationName.equals(operation.getTemplate())) {
610 handlerList.add(handlerReg.handler);
611 }
612 }
613 if (handlerList.size() > 1) {
614 throw new IllegalStateException("Multiple operation handlers found for operation '" + operation.getTemplate()
615 + "'");
616 } else if (handlerList.size() == 1) {
617 return handlerList.get(0);
618 }
619 logger.warn("No workflow operation handlers found for operation '{}'", operation.getTemplate());
620 return null;
621 }
622
623
624
625
626
627
628
629
630
631
632 protected Job runWorkflow(WorkflowInstance workflow) throws WorkflowException, UnauthorizedException {
633 if (INSTANTIATED != workflow.getState()) {
634
635
636
637
638 if (RUNNING == workflow.getState()) {
639 WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
640 if (currentOperation != null) {
641 if (currentOperation.getId() != null) {
642 try {
643 Job operationJob = serviceRegistry.getJob(currentOperation.getId());
644 if (Job.Status.RUNNING.equals(operationJob.getStatus())) {
645 logger.debug("Not starting workflow {}, it is already in running state", workflow);
646 return null;
647 } else {
648 logger.info("Scheduling next operation of workflow {}", workflow);
649 operationJob.setStatus(Status.QUEUED);
650 operationJob.setDispatchable(true);
651 return serviceRegistry.updateJob(operationJob);
652 }
653 } catch (Exception e) {
654 logger.warn("Error determining status of current workflow operation in {}", workflow, e);
655 return null;
656 }
657 }
658 } else {
659 throw new IllegalStateException("Cannot start a workflow '" + workflow + "' with no current operation");
660 }
661 } else {
662 throw new IllegalStateException("Cannot start a workflow in state '" + workflow.getState() + "'");
663 }
664 }
665
666
667 workflow.setState(RUNNING);
668 update(workflow);
669
670 WorkflowOperationInstance operation = workflow.getCurrentOperation();
671
672 if (operation == null) {
673 throw new IllegalStateException("Cannot start a workflow without a current operation");
674 }
675
676 if (!operation.equals(workflow.getOperations().get(0))) {
677 throw new IllegalStateException("Current operation expected to be first");
678 }
679
680 try {
681 logger.info("Scheduling workflow {} for execution", workflow.getId());
682 Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
683 Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
684 operation.setId(job.getId());
685 update(workflow);
686 job.setStatus(Status.QUEUED);
687 job.setDispatchable(true);
688 return serviceRegistry.updateJob(job);
689 } catch (ServiceRegistryException e) {
690 throw new WorkflowDatabaseException(e);
691 } catch (NotFoundException e) {
692
693 throw new IllegalStateException("Unable to find a job that was just created");
694 }
695
696 }
697
698
699
700
701
702
703
704
705
706
707
708
709
710 protected WorkflowOperationInstance runWorkflowOperation(WorkflowInstance workflow, Map<String, String> properties)
711 throws WorkflowException, UnauthorizedException {
712 WorkflowOperationInstance processingOperation = workflow.getCurrentOperation();
713 if (processingOperation == null)
714 throw new IllegalStateException("Workflow '" + workflow + "' has no operation to run");
715
716
717 WorkflowState initialState = workflow.getState();
718
719
720 WorkflowOperationHandler operationHandler = selectOperationHandler(processingOperation);
721 WorkflowOperationWorker worker = new WorkflowOperationWorker(operationHandler, workflow, properties, this);
722 workflow = worker.execute();
723
724 Long currentOperationJobId = processingOperation.getId();
725 try {
726 updateOperationJob(currentOperationJobId, processingOperation.getState());
727 } catch (NotFoundException e) {
728 throw new IllegalStateException("Unable to find a job that has already been running");
729 } catch (ServiceRegistryException e) {
730 throw new WorkflowDatabaseException(e);
731 }
732
733
734 WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
735
736
737 if (currentOperation == null) {
738
739
740 if (FAILING.equals(workflow.getState())) {
741 workflow.setState(FAILED);
742 }
743
744
745
746 else if (!FAILED.equals(workflow.getState())) {
747 workflow.setState(SUCCEEDED);
748 for (WorkflowOperationInstance op : workflow.getOperations()) {
749 if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
750 if (op.isFailOnError()) {
751 workflow.setState(FAILED);
752 break;
753 }
754 }
755 }
756 }
757
758
759 logger.debug("{} has {}", workflow, workflow.getState());
760 update(workflow);
761
762 } else {
763
764
765 WorkflowState dbWorkflowState;
766 try {
767 dbWorkflowState = getWorkflowById(workflow.getId()).getState();
768 } catch (NotFoundException e) {
769 throw new IllegalStateException("The workflow with ID " + workflow.getId()
770 + " can not be found in the database", e);
771 } catch (UnauthorizedException e) {
772 throw new IllegalStateException("The workflow with ID " + workflow.getId() + " can not be read", e);
773 }
774
775
776 if (!dbWorkflowState.equals(initialState)) {
777 logger.info("Workflow state for {} was changed to '{}' from the outside", workflow, dbWorkflowState);
778 workflow.setState(dbWorkflowState);
779 }
780
781
782
783 Job job;
784 switch (workflow.getState()) {
785 case FAILED:
786 update(workflow);
787 break;
788 case FAILING:
789 case RUNNING:
790 try {
791 job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
792 Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
793 currentOperation.setId(job.getId());
794 update(workflow);
795 job.setStatus(Status.QUEUED);
796 job.setDispatchable(true);
797 serviceRegistry.updateJob(job);
798 } catch (ServiceRegistryException e) {
799 throw new WorkflowDatabaseException(e);
800 } catch (NotFoundException e) {
801
802 throw new IllegalStateException("Unable to find a job that was just created");
803 }
804 break;
805 case PAUSED:
806 case STOPPED:
807 case SUCCEEDED:
808 update(workflow);
809 break;
810 case INSTANTIATED:
811 update(workflow);
812 throw new IllegalStateException("Impossible workflow state found during processing");
813 default:
814 throw new IllegalStateException("Unknown workflow state found during processing");
815 }
816
817 }
818
819 return processingOperation;
820 }
821
822
823
824
825
826
827 @Override
828 public WorkflowDefinition getWorkflowDefinitionById(String id) throws NotFoundException {
829 final WorkflowIdentifier workflowIdentifier = new WorkflowIdentifier(id, securityService.getOrganization().getId());
830 final WorkflowDefinition def = workflowDefinitionScanner
831 .getWorkflowDefinition(securityService.getUser(), workflowIdentifier);
832 if (def == null) {
833 throw new NotFoundException("Workflow definition '" + workflowIdentifier + "' not found or inaccessible");
834 }
835 return def;
836 }
837
838
839
840
841
842
843 @Override
844 public WorkflowInstance stop(long workflowInstanceId) throws WorkflowException, NotFoundException,
845 UnauthorizedException {
846 final Lock lock = this.lock.get(workflowInstanceId);
847 lock.lock();
848 try {
849 WorkflowInstance instance = getWorkflowById(workflowInstanceId);
850
851 if (instance.getState() != STOPPED) {
852
853 instance.setState(STOPPED);
854 update(instance);
855 }
856
857 try {
858 removeTempFiles(instance);
859 } catch (Exception e) {
860 logger.warn("Cannot remove temp files for workflow instance {}", workflowInstanceId, e);
861 }
862
863 return instance;
864 } finally {
865 lock.unlock();
866 }
867 }
868
869
870
871
872 private void validUserOrThrow(User user) {
873 if (user == null)
874 throw new SecurityException("Current user is unknown");
875
876 if (userDirectoryService.loadUser(user.getUsername()) == null)
877 throw new SecurityException(String.format("Current user '%s' can not be loaded", user.getUsername()));
878 }
879
880 private void removeTempFiles(WorkflowInstance workflowInstance) {
881 logger.info("Removing temporary files for workflow {}", workflowInstance.getId());
882 MediaPackage mp = workflowInstance.getMediaPackage();
883 if (null == mp) {
884 logger.warn("Workflow instance {} does not have an media package set", workflowInstance.getId());
885 return;
886 }
887 for (MediaPackageElement elem : mp.getElements()) {
888
889 if (elem instanceof Publication) {
890 continue;
891 }
892 if (null == elem.getURI()) {
893 logger.warn("Media package element {} from the media package {} does not have an URI set",
894 elem.getIdentifier(), mp.getIdentifier());
895 continue;
896 }
897 try {
898 logger.debug("Removing temporary file {} for workflow {}", elem.getURI(), workflowInstance);
899 workspace.delete(elem.getURI());
900 } catch (IOException e) {
901 logger.warn("Unable to delete mediapackage element", e);
902 } catch (NotFoundException e) {
903
904 }
905 }
906 }
907
908
909
910
911
912
913
914 @Override
915 public void remove(long workflowInstanceId) throws WorkflowDatabaseException, NotFoundException,
916 UnauthorizedException, WorkflowParsingException, WorkflowStateException {
917 remove(workflowInstanceId, false);
918 }
919
920
921
922
923
924
925 @Override
926 public void remove(long workflowInstanceId, boolean force) throws WorkflowDatabaseException, NotFoundException,
927 UnauthorizedException, WorkflowStateException {
928 final Lock lock = this.lock.get(workflowInstanceId);
929 lock.lock();
930 try {
931 WorkflowInstance instance = getWorkflowById(workflowInstanceId);
932 WorkflowInstance.WorkflowState state = instance.getState();
933 if (!state.isTerminated() && !force) {
934 throw new WorkflowStateException("Workflow instance with state '" + state + "' cannot be removed "
935 + "since it is not yet terminated.");
936 }
937
938 assertPermission(instance, Permissions.Action.WRITE.toString(), instance.getOrganizationId());
939
940
941 removeTempFiles(instance);
942
943
944 List<WorkflowOperationInstance> operations = instance.getOperations();
945 List<Long> jobsToDelete = new ArrayList<>();
946 for (WorkflowOperationInstance op : operations) {
947 if (op.getId() != null) {
948 long workflowOpId = op.getId();
949 if (workflowOpId != workflowInstanceId) {
950 jobsToDelete.add(workflowOpId);
951 }
952 }
953 }
954 try {
955 serviceRegistry.removeJobs(jobsToDelete);
956 } catch (ServiceRegistryException e) {
957 logger.warn("Problems while removing jobs related to workflow operations '{}'", jobsToDelete, e);
958 } catch (NotFoundException e) {
959 logger.debug("No jobs related to one of the workflow operation '{}' found in service registry", jobsToDelete);
960 }
961
962
963 try {
964 serviceRegistry.removeJobs(Collections.singletonList(workflowInstanceId));
965 removeWorkflowInstanceFromIndex(instance.getId());
966 } catch (ServiceRegistryException e) {
967 logger.warn("Problems while removing workflow instance job '{}'", workflowInstanceId, e);
968 } catch (NotFoundException e) {
969 logger.info("No workflow instance job '{}' found in the service registry", workflowInstanceId);
970 }
971
972
973 persistence.removeFromDatabase(instance);
974 } finally {
975 lock.unlock();
976 }
977 }
978
979
980
981
982
983
984 @Override
985 public WorkflowInstance suspend(long workflowInstanceId) throws WorkflowException, NotFoundException,
986 UnauthorizedException {
987 final Lock lock = this.lock.get(workflowInstanceId);
988 lock.lock();
989 try {
990 WorkflowInstance instance = getWorkflowById(workflowInstanceId);
991 instance.setState(PAUSED);
992 update(instance);
993 return instance;
994 } finally {
995 lock.unlock();
996 }
997 }
998
999
1000
1001
1002
1003
1004 @Override
1005 public WorkflowInstance resume(long id) throws WorkflowException, NotFoundException, IllegalStateException,
1006 UnauthorizedException {
1007 return resume(id, null);
1008 }
1009
1010
1011
1012
1013
1014
1015 @Override
1016 public WorkflowInstance resume(long workflowInstanceId, Map<String, String> properties) throws WorkflowException,
1017 NotFoundException, IllegalStateException, UnauthorizedException {
1018 WorkflowInstance workflowInstance = getWorkflowById(workflowInstanceId);
1019 if (!WorkflowState.PAUSED.equals(workflowInstance.getState()))
1020 throw new IllegalStateException("Can not resume a workflow where the current state is not in paused");
1021
1022 workflowInstance = updateConfiguration(workflowInstance, properties);
1023 update(workflowInstance);
1024
1025 WorkflowOperationInstance currentOperation = workflowInstance.getCurrentOperation();
1026
1027
1028 if (currentOperation == null) {
1029
1030
1031 workflowInstance.setState(SUCCEEDED);
1032 for (WorkflowOperationInstance op : workflowInstance.getOperations()) {
1033 if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
1034 if (op.isFailOnError()) {
1035 workflowInstance.setState(FAILED);
1036 break;
1037 }
1038 }
1039 }
1040
1041
1042 logger.debug("{} has {}", workflowInstance, workflowInstance.getState());
1043 update(workflowInstance);
1044 return workflowInstance;
1045 }
1046
1047
1048
1049 if (OperationState.INSTANTIATED.equals(currentOperation.getState())) {
1050 try {
1051
1052 Job operationJob = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
1053 Collections.singletonList(Long.toString(workflowInstanceId)), null, false, null, WORKFLOW_JOB_LOAD);
1054
1055
1056
1057 workflowInstance.setState(RUNNING);
1058 currentOperation.setId(operationJob.getId());
1059
1060
1061 update(workflowInstance);
1062
1063
1064 operationJob.setStatus(Status.QUEUED);
1065 operationJob.setDispatchable(true);
1066 serviceRegistry.updateJob(operationJob);
1067
1068 return workflowInstance;
1069 } catch (ServiceRegistryException e) {
1070 throw new WorkflowDatabaseException(e);
1071 }
1072 }
1073
1074 Long operationJobId = workflowInstance.getCurrentOperation().getId();
1075 if (operationJobId == null)
1076 throw new IllegalStateException("Can not resume a workflow where the current operation has no associated id");
1077
1078
1079 Job workflowJob;
1080 try {
1081 workflowJob = serviceRegistry.getJob(workflowInstanceId);
1082 workflowJob.setStatus(Status.RUNNING);
1083 persistence.updateInDatabase(workflowInstance);
1084 serviceRegistry.updateJob(workflowJob);
1085
1086 Job operationJob = serviceRegistry.getJob(operationJobId);
1087 operationJob.setStatus(Status.QUEUED);
1088 operationJob.setDispatchable(true);
1089 if (properties != null) {
1090 Properties props = new Properties();
1091 props.putAll(properties);
1092 ByteArrayOutputStream out = new ByteArrayOutputStream();
1093 props.store(out, null);
1094 List<String> newArguments = new ArrayList<String>(operationJob.getArguments());
1095 newArguments.add(new String(out.toByteArray(), StandardCharsets.UTF_8));
1096 operationJob.setArguments(newArguments);
1097 }
1098 serviceRegistry.updateJob(operationJob);
1099 } catch (ServiceRegistryException e) {
1100 throw new WorkflowDatabaseException(e);
1101 } catch (IOException e) {
1102 throw new WorkflowParsingException("Unable to parse workflow and/or workflow properties");
1103 }
1104
1105 return workflowInstance;
1106 }
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118 protected void assertPermission(WorkflowInstance workflow, String action, String workflowOrgId) throws UnauthorizedException {
1119 User currentUser = securityService.getUser();
1120 Organization currentOrg = securityService.getOrganization();
1121 String currentOrgAdminRole = currentOrg.getAdminRole();
1122 String currentOrgId = currentOrg.getId();
1123
1124 MediaPackage mediapackage = workflow.getMediaPackage();
1125
1126 WorkflowState state = workflow.getState();
1127 if (state != INSTANTIATED && state != RUNNING && workflow.getState() != FAILING) {
1128 Optional<MediaPackage> assetMediapackage = assetManager.getMediaPackage(mediapackage.getIdentifier().toString());
1129 if (assetMediapackage.isPresent()) {
1130 mediapackage = assetMediapackage.get();
1131 }
1132 }
1133
1134 var creatorName = workflow.getCreatorName();
1135 var workflowCreator = creatorName == null ? null : userDirectoryService.loadUser(creatorName);
1136 boolean authorized = currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1137 || (currentUser.hasRole(currentOrgAdminRole) && currentOrgId.equals(workflowOrgId))
1138 || (currentUser.equals(workflowCreator))
1139 || (authorizationService.hasPermission(mediapackage, action) && currentOrgId.equals(workflowOrgId));
1140
1141 if (!authorized) {
1142 throw new UnauthorizedException(currentUser, action);
1143 }
1144 }
1145
1146 protected boolean assertMediaPackagePermission(String mediaPackageId, String action) {
1147 var currentUser = securityService.getUser();
1148 Optional<MediaPackage> mp = assetManager.getMediaPackage(mediaPackageId);
1149
1150
1151
1152
1153 return currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1154 || mp.isPresent() && currentUser.hasRole(securityService.getOrganization().getAdminRole())
1155 || mp.isPresent() && authorizationService.hasPermission(mp.get(), action);
1156 }
1157
1158
1159
1160
1161
1162
1163 @Override
1164 public void update(final WorkflowInstance workflowInstance) throws WorkflowDatabaseException, UnauthorizedException {
1165 final Lock lock = updateLock.get(workflowInstance.getId());
1166 lock.lock();
1167
1168 try {
1169 WorkflowInstance originalWorkflowInstance = null;
1170 try {
1171
1172 originalWorkflowInstance = getWorkflowById(workflowInstance.getId());
1173 } catch (NotFoundException e) {
1174
1175 }
1176
1177 MediaPackage updatedMediaPackage = null;
1178 try {
1179
1180
1181 updatedMediaPackage = workflowInstance.getMediaPackage();
1182
1183 populateMediaPackageMetadata(updatedMediaPackage);
1184
1185 String seriesId = updatedMediaPackage.getSeries();
1186 if (seriesId != null && workflowInstance.getCurrentOperation() != null) {
1187
1188
1189
1190 try {
1191 AccessControlList acl = seriesService.getSeriesAccessControl(seriesId);
1192 Tuple<AccessControlList, AclScope> activeAcl = authorizationService.getAcl(
1193 updatedMediaPackage, AclScope.Series);
1194
1195 if (!AclScope.Series.equals(activeAcl.getB()) || !AccessControlUtil.equals(activeAcl.getA(), acl)) {
1196 authorizationService.setAcl(updatedMediaPackage, AclScope.Series, acl);
1197 }
1198 } catch (NotFoundException e) {
1199 logger.debug("Not updating series ACL on event {} since series {} has no ACL set",
1200 updatedMediaPackage, seriesId, e);
1201 }
1202 }
1203
1204 workflowInstance.setMediaPackage(updatedMediaPackage);
1205 } catch (SeriesException e) {
1206 throw new WorkflowDatabaseException(e);
1207 } catch (Exception e) {
1208 logger.error("Metadata for media package {} could not be updated", updatedMediaPackage, e);
1209 }
1210
1211
1212 WorkflowState workflowState = workflowInstance.getState();
1213
1214 Job job;
1215 try {
1216 job = serviceRegistry.getJob(workflowInstance.getId());
1217 job.setPayload(Long.toString(workflowInstance.getId()));
1218
1219
1220 switch (workflowState) {
1221 case FAILED:
1222 job.setStatus(Status.FAILED);
1223 break;
1224 case FAILING:
1225 break;
1226 case INSTANTIATED:
1227 job.setDispatchable(true);
1228 job.setStatus(Status.QUEUED);
1229 break;
1230 case PAUSED:
1231 job.setStatus(Status.PAUSED);
1232 break;
1233 case RUNNING:
1234 job.setStatus(Status.RUNNING);
1235 break;
1236 case STOPPED:
1237 job.setStatus(Status.CANCELLED);
1238 break;
1239 case SUCCEEDED:
1240 job.setStatus(Status.FINISHED);
1241 break;
1242 default:
1243 throw new IllegalStateException("Found a workflow state that is not handled");
1244 }
1245 } catch (ServiceRegistryException e) {
1246 throw new WorkflowDatabaseException(
1247 "Unable to read workflow job " + workflowInstance.getId() + " from service registry", e);
1248 } catch (NotFoundException e) {
1249 throw new WorkflowDatabaseException(
1250 "Job for workflow " + workflowInstance.getId() + " not found in service registry", e);
1251 }
1252
1253
1254 try {
1255
1256 persistence.updateInDatabase(workflowInstance);
1257
1258 job = serviceRegistry.updateJob(job);
1259
1260 WorkflowOperationInstance op = workflowInstance.getCurrentOperation();
1261
1262
1263
1264
1265 if (op == null || op.getState() != OperationState.RUNNING) {
1266
1267 long id = workflowInstance.getId();
1268 int state = workflowInstance.getState().ordinal();
1269 var template = workflowInstance.getTemplate();
1270 String mpId = workflowInstance.getMediaPackage().getIdentifier().toString();
1271 String orgId = workflowInstance.getOrganizationId();
1272
1273 updateWorkflowInstanceInIndex(id, state, template, mpId, orgId);
1274 }
1275 } catch (ServiceRegistryException e) {
1276 throw new WorkflowDatabaseException("Update of workflow job " + workflowInstance.getId()
1277 + " in the service registry failed, service registry and workflow table may be out of sync", e);
1278 } catch (NotFoundException e) {
1279 throw new WorkflowDatabaseException("Job for workflow " + workflowInstance.getId()
1280 + " not found in service registry", e);
1281 } catch (Exception e) {
1282 throw new WorkflowDatabaseException("Update of workflow job " + job.getId() + " in the service registry failed, "
1283 + "service registry and workflow table may be out of sync", e);
1284 }
1285
1286 try {
1287 fireListeners(originalWorkflowInstance, workflowInstance);
1288 } catch (Exception e) {
1289
1290 throw new IllegalStateException("In-memory workflow instance could not be serialized", e);
1291 }
1292 } finally {
1293 lock.unlock();
1294 }
1295 }
1296
1297
1298
1299
1300
1301
1302 @Override
1303 public long countWorkflowInstances() throws WorkflowDatabaseException {
1304 return countWorkflowInstances(null);
1305 }
1306
1307
1308
1309
1310
1311
1312 @Override
1313 public long countWorkflowInstances(WorkflowState state) throws WorkflowDatabaseException {
1314 return persistence.countWorkflows(state);
1315 }
1316
1317
1318
1319
1320
1321
1322 @Override
1323 public List<WorkflowInstance> getWorkflowInstancesByMediaPackage(String mediaPackageId)
1324 throws WorkflowDatabaseException, UnauthorizedException {
1325
1326 if (!assertMediaPackagePermission(mediaPackageId, Permissions.Action.READ.toString())) {
1327 throw new UnauthorizedException("Not allowed to access event");
1328 }
1329 return persistence.getWorkflowInstancesByMediaPackage(mediaPackageId);
1330 }
1331
1332
1333
1334
1335
1336
1337 @Override
1338 public Optional<WorkflowInstance> getRunningWorkflowInstanceByMediaPackage(String mediaPackageId, String action)
1339 throws WorkflowException, UnauthorizedException, WorkflowDatabaseException {
1340 List<WorkflowInstance> workflowInstances = persistence.getRunningWorkflowInstancesByMediaPackage(mediaPackageId);
1341
1342
1343 if (workflowInstances.size() > 1) {
1344 throw new WorkflowException("Multiple workflows are active on mediapackage " + mediaPackageId);
1345 }
1346
1347 Optional<WorkflowInstance> optWorkflowInstance = Optional.empty();
1348 if (workflowInstances.size() == 1) {
1349 WorkflowInstance wfInstance = workflowInstances.get(0);
1350 optWorkflowInstance = Optional.of(wfInstance);
1351 assertPermission(wfInstance, action, wfInstance.getOrganizationId());
1352 }
1353
1354 return optWorkflowInstance;
1355 }
1356
1357
1358
1359
1360
1361
1362 @Override
1363 public boolean mediaPackageHasActiveWorkflows(String mediaPackageId) throws WorkflowDatabaseException {
1364 return persistence.mediaPackageHasActiveWorkflows(mediaPackageId);
1365 }
1366
1367
1368
1369
1370
1371
1372 @Override
1373 public boolean userHasActiveWorkflows(String userId) throws WorkflowDatabaseException {
1374 return persistence.userHasActiveWorkflows(userId);
1375 }
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387 protected WorkflowInstance handleOperationException(
1388 WorkflowInstance workflow,
1389 WorkflowOperationInstance currentOperation) {
1390 int failedAttempt = currentOperation.getFailedAttempts() + 1;
1391 currentOperation.setFailedAttempts(failedAttempt);
1392
1393
1394 if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate())
1395 && OperationState.FAILED.equals(currentOperation.getState())) {
1396 int position = workflow.getOperations().indexOf(currentOperation);
1397
1398 if (workflow.getOperations().size() > position + 1) {
1399 currentOperation = workflow.getOperations().get(position + 1);
1400
1401 currentOperation.setState(OperationState.FAILED);
1402 }
1403 handleFailedOperation(workflow, currentOperation);
1404 } else if (currentOperation.getMaxAttempts() != -1 && failedAttempt == currentOperation.getMaxAttempts()) {
1405 handleFailedOperation(workflow, currentOperation);
1406 } else {
1407 switch (currentOperation.getRetryStrategy()) {
1408 case NONE:
1409 handleFailedOperation(workflow, currentOperation);
1410 break;
1411 case RETRY:
1412 currentOperation.setState(OperationState.RETRY);
1413 break;
1414 case HOLD:
1415 currentOperation.setState(OperationState.RETRY);
1416 List<WorkflowOperationInstance> operations = workflow.getOperations();
1417 WorkflowOperationDefinitionImpl errorResolutionDefinition = new WorkflowOperationDefinitionImpl(
1418 ERROR_RESOLUTION_HANDLER_ID, "Error Resolution Operation", "error", false);
1419 var errorResolutionInstance = new WorkflowOperationInstance(errorResolutionDefinition);
1420 errorResolutionInstance.setExceptionHandlingWorkflow(currentOperation.getExceptionHandlingWorkflow());
1421 var index = workflow.getOperations().indexOf(currentOperation);
1422 operations.add(index, errorResolutionInstance);
1423 workflow.setOperations(operations);
1424 break;
1425 default:
1426 break;
1427 }
1428 }
1429 return workflow;
1430 }
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440 private void handleFailedOperation(WorkflowInstance workflow, WorkflowOperationInstance currentOperation) {
1441 String errorDefId = currentOperation.getExceptionHandlingWorkflow();
1442
1443
1444 if (currentOperation.isFailOnError()) {
1445 if (StringUtils.isBlank(errorDefId)) {
1446 workflow.setState(FAILED);
1447 } else {
1448 workflow.setState(FAILING);
1449
1450
1451 int currentOperationPosition = workflow.getOperations().indexOf(currentOperation);
1452 List<WorkflowOperationInstance> operations = new ArrayList<>(
1453 workflow.getOperations().subList(0, currentOperationPosition + 1));
1454 workflow.setOperations(operations);
1455
1456
1457 Map<String, String> configuration = new HashMap<>();
1458 for (String configKey : workflow.getConfigurationKeys()) {
1459 configuration.put(configKey, workflow.getConfiguration(configKey));
1460 }
1461
1462
1463 WorkflowDefinition errorDef = null;
1464 try {
1465 errorDef = getWorkflowDefinitionById(errorDefId);
1466 workflow.extend(errorDef);
1467 workflow.setOperations(updateConfiguration(workflow, configuration).getOperations());
1468 } catch (NotFoundException notFoundException) {
1469 throw new IllegalStateException("Unable to find the error workflow definition '" + errorDefId + "'");
1470 }
1471 }
1472 }
1473
1474
1475 currentOperation.setState(OperationState.FAILED);
1476 }
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490 protected WorkflowInstance handleOperationResult(WorkflowInstance workflow, WorkflowOperationResult result)
1491 throws WorkflowDatabaseException {
1492
1493
1494 WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
1495 WorkflowOperationHandler handler = getWorkflowOperationHandler(currentOperation.getTemplate());
1496
1497
1498 if (result == null) {
1499 logger.warn("Handling a null operation result for workflow {} in operation {}", workflow.getId(),
1500 currentOperation.getTemplate());
1501 result = new WorkflowOperationResultImpl(workflow.getMediaPackage(), null, Action.CONTINUE, 0);
1502 } else {
1503 MediaPackage mp = result.getMediaPackage();
1504 if (mp != null) {
1505 workflow.setMediaPackage(mp);
1506 }
1507 }
1508
1509
1510 Action action = result.getAction();
1511
1512
1513 workflow = updateConfiguration(workflow, result.getProperties());
1514
1515
1516 currentOperation.setTimeInQueue(result.getTimeInQueue());
1517
1518
1519 switch (action) {
1520 case CONTINUE:
1521 currentOperation.setState(OperationState.SUCCEEDED);
1522 break;
1523 case PAUSE:
1524 if (!(handler instanceof ResumableWorkflowOperationHandler)) {
1525 throw new IllegalStateException("Operation " + currentOperation.getTemplate() + " is not resumable");
1526 }
1527
1528
1529 currentOperation.setContinuable(result.allowsContinue());
1530 currentOperation.setAbortable(result.allowsAbort());
1531
1532 workflow.setState(PAUSED);
1533 currentOperation.setState(OperationState.PAUSED);
1534 break;
1535 case SKIP:
1536 currentOperation.setState(OperationState.SKIPPED);
1537 break;
1538 default:
1539 throw new IllegalStateException("Unknown action '" + action + "' returned");
1540 }
1541
1542 if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate()) && result.getAction() == Action.CONTINUE) {
1543
1544 Map<String, String> resultProperties = result.getProperties();
1545 if (resultProperties == null || StringUtils.isBlank(resultProperties.get(RETRY_STRATEGY)))
1546 throw new WorkflowDatabaseException("Retry strategy not present in properties!");
1547
1548 RetryStrategy retryStrategy = RetryStrategy.valueOf(resultProperties.get(RETRY_STRATEGY));
1549 switch (retryStrategy) {
1550 case NONE:
1551 handleFailedOperation(workflow, workflow.getCurrentOperation());
1552 break;
1553 case RETRY:
1554 break;
1555 default:
1556 throw new WorkflowDatabaseException("Retry strategy not implemented yet!");
1557 }
1558 }
1559
1560 return workflow;
1561 }
1562
1563
1564
1565
1566
1567
1568
1569 protected void populateMediaPackageMetadata(MediaPackage mp) {
1570 if (metadataServices.isEmpty()) {
1571 logger.warn("No metadata services are registered, so no media package metadata can be extracted from catalogs");
1572 return;
1573 }
1574 for (MediaPackageMetadataService metadataService : metadataServices) {
1575 MediaPackageMetadata metadata = metadataService.getMetadata(mp);
1576 MediaPackageMetadataSupport.populateMediaPackageMetadata(mp, metadata);
1577 }
1578 }
1579
1580
1581
1582
1583
1584
1585 @Override
1586 public boolean isReadyToAcceptJobs(String operation) {
1587 return true;
1588 }
1589
1590
1591
1592
1593
1594
1595
1596
1597 @Override
1598 public boolean isReadyToAccept(Job job) throws UndispatchableJobException {
1599 String operation = job.getOperation();
1600
1601
1602 if (!Operation.START_WORKFLOW.toString().equals(operation))
1603 return true;
1604
1605
1606 if (job.getArguments().size() > 1 && job.getArguments().get(0) != null) {
1607 try {
1608 WorkflowDefinition workflowDef = XmlWorkflowParser.parseWorkflowDefinition(job.getArguments().get(0));
1609 if (workflowDef.getOperations().size() > 0) {
1610 String firstOperationId = workflowDef.getOperations().get(0).getId();
1611 WorkflowOperationHandler handler = getWorkflowOperationHandler(firstOperationId);
1612 if (handler instanceof ResumableWorkflowOperationHandler) {
1613 if (((ResumableWorkflowOperationHandler) handler).isAlwaysPause()) {
1614 return true;
1615 }
1616 }
1617 }
1618 } catch (WorkflowParsingException e) {
1619 throw new UndispatchableJobException(job + " is not a proper job to start a workflow", e);
1620 }
1621 }
1622
1623 WorkflowInstance workflow;
1624 Optional<WorkflowInstance> workflowInstance;
1625 String mediaPackageId;
1626
1627
1628 try {
1629 workflow = getWorkflowById(job.getId());
1630 mediaPackageId = workflow.getMediaPackage().getIdentifier().toString();
1631 } catch (NotFoundException e) {
1632 throw new UndispatchableJobException("Trying to start workflow with job id " + job.getId()
1633 + " but no corresponding instance is available from the workflow service", e);
1634 } catch (UnauthorizedException e) {
1635 throw new UndispatchableJobException(
1636 "Authorization denied while requesting to loading workflow instance. Job: " + job.getId(), e);
1637 }
1638
1639 try {
1640 workflowInstance = getRunningWorkflowInstanceByMediaPackage(
1641 workflow.getMediaPackage().getIdentifier().toString(), Permissions.Action.READ.toString());
1642 } catch (UnauthorizedException e) {
1643 throw new UndispatchableJobException("Authorization denied while requesting to loading workflow instance " + workflow.getId(), e);
1644 } catch (WorkflowDatabaseException e) {
1645 throw new UndispatchableJobException("An database error occurred while checking if a workflow is already active "
1646 + "(job: " + job.getId() + ")", e);
1647 } catch (WorkflowException e) {
1648
1649 delayWorkflow(workflow, mediaPackageId);
1650 return false;
1651 }
1652
1653
1654 if (workflowInstance.isPresent() && workflow.getId() != workflowInstance.get().getId()) {
1655 delayWorkflow(workflow, mediaPackageId);
1656 return false;
1657 }
1658
1659 return true;
1660 }
1661
1662 private void delayWorkflow(WorkflowInstance workflow, String mediaPackageId) {
1663 if (!delayedWorkflows.contains(workflow.getId())) {
1664 logger.info("Delaying start of workflow {}, another workflow on media package {} is still running",
1665 workflow.getId(), mediaPackageId);
1666 delayedWorkflows.add(workflow.getId());
1667 }
1668 }
1669
1670
1671
1672
1673
1674
1675 @Override
1676 public synchronized void acceptJob(Job job) throws ServiceRegistryException {
1677 User originalUser = securityService.getUser();
1678 Organization originalOrg = securityService.getOrganization();
1679 try {
1680 Organization organization = organizationDirectoryService.getOrganization(job.getOrganization());
1681 securityService.setOrganization(organization);
1682 User user = userDirectoryService.loadUser(job.getCreator());
1683 securityService.setUser(user);
1684 job.setStatus(Job.Status.RUNNING);
1685 job = serviceRegistry.updateJob(job);
1686
1687
1688 if (delayedWorkflows.contains(job.getId())) {
1689 delayedWorkflows.remove(job.getId());
1690 logger.info("Starting initially delayed workflow {}, {} more waiting", job.getId(), delayedWorkflows.size());
1691 }
1692
1693 executorService.submit(new JobRunner(job, serviceRegistry.getCurrentJob()));
1694 } catch (Exception e) {
1695 if (e instanceof ServiceRegistryException)
1696 throw (ServiceRegistryException) e;
1697 throw new ServiceRegistryException(e);
1698 } finally {
1699 securityService.setUser(originalUser);
1700 securityService.setOrganization(originalOrg);
1701 }
1702 }
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713 protected String process(Job job) throws Exception {
1714 List<String> arguments = job.getArguments();
1715 Operation op = null;
1716 WorkflowInstance workflowInstance = null;
1717 WorkflowOperationInstance wfo;
1718 String operation = job.getOperation();
1719 try {
1720 try {
1721 op = Operation.valueOf(operation);
1722 switch (op) {
1723 case START_WORKFLOW:
1724 workflowInstance = persistence.getWorkflow(Long.parseLong(job.getPayload()));
1725 logger.debug("Starting new workflow {}", workflowInstance);
1726 runWorkflow(workflowInstance);
1727 break;
1728 case RESUME:
1729 workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1730 Map<String, String> properties = null;
1731 if (arguments.size() > 1) {
1732 Properties props = new Properties();
1733 props.load(IOUtils.toInputStream(arguments.get(arguments.size() - 1), StandardCharsets.UTF_8));
1734 properties = new HashMap<>();
1735 for (Entry<Object, Object> entry : props.entrySet()) {
1736 properties.put(entry.getKey().toString(), entry.getValue().toString());
1737 }
1738 }
1739 logger.debug("Resuming {} at {}", workflowInstance, workflowInstance.getCurrentOperation());
1740 workflowInstance.setState(RUNNING);
1741 update(workflowInstance);
1742 runWorkflowOperation(workflowInstance, properties);
1743 break;
1744 case START_OPERATION:
1745 workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1746 wfo = workflowInstance.getCurrentOperation();
1747
1748 if (OperationState.RUNNING.equals(wfo.getState()) || OperationState.PAUSED.equals(wfo.getState())) {
1749 logger.info("Reset operation state {} {} to INSTANTIATED due to job restart", workflowInstance, wfo);
1750 wfo.setState(OperationState.INSTANTIATED);
1751 }
1752
1753 wfo.setExecutionHost(job.getProcessingHost());
1754 logger.debug("Running {} {}", workflowInstance, wfo);
1755 wfo = runWorkflowOperation(workflowInstance, null);
1756 updateOperationJob(job.getId(), wfo.getState());
1757 break;
1758 default:
1759 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
1760 }
1761 } catch (IllegalArgumentException e) {
1762 throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
1763 } catch (IndexOutOfBoundsException e) {
1764 throw new ServiceRegistryException(
1765 "The argument list for operation '" + op + "' (job: " + job.getId() + ") does not meet expectations", e);
1766 } catch (NotFoundException e) {
1767 logger.warn("Not found processing job {}", job, e);
1768 updateOperationJob(job.getId(), OperationState.FAILED);
1769 }
1770 return null;
1771 } catch (Exception e) {
1772 logger.warn("Exception while accepting job {}", job, e);
1773 try {
1774 if (workflowInstance != null) {
1775 logger.warn("Marking job {} and workflow instance {} as failed", job, workflowInstance);
1776 updateOperationJob(job.getId(), OperationState.FAILED);
1777 workflowInstance.setState(FAILED);
1778 update(workflowInstance);
1779 } else {
1780 logger.warn("Unable to parse workflow instance", e);
1781 }
1782 } catch (WorkflowDatabaseException e1) {
1783 throw new ServiceRegistryException(e1);
1784 }
1785 if (e instanceof ServiceRegistryException)
1786 throw e;
1787 throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
1788 }
1789 }
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805 private Job updateOperationJob(Long jobId, OperationState state) throws NotFoundException, ServiceRegistryException {
1806 if (jobId == null)
1807 return null;
1808 Job job = serviceRegistry.getJob(jobId);
1809 switch (state) {
1810 case FAILED:
1811 case RETRY:
1812 job.setStatus(Status.FAILED);
1813 break;
1814 case PAUSED:
1815 job.setStatus(Status.PAUSED);
1816 job.setOperation(Operation.RESUME.toString());
1817 break;
1818 case SKIPPED:
1819 case SUCCEEDED:
1820 job.setStatus(Status.FINISHED);
1821 break;
1822 default:
1823 throw new IllegalStateException("Unexpected state '" + state + "' found");
1824 }
1825 return serviceRegistry.updateJob(job);
1826 }
1827
1828
1829
1830
1831
1832
1833 @Override
1834 public long countJobs(Status status) throws ServiceRegistryException {
1835 return serviceRegistry.count(JOB_TYPE, status);
1836 }
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846 private String mapToString(Map<String, String> props) {
1847 if (props == null)
1848 return null;
1849 StringBuilder sb = new StringBuilder();
1850 for (Entry<String, String> entry : props.entrySet()) {
1851 sb.append(entry.getKey());
1852 sb.append("=");
1853 sb.append(entry.getValue());
1854 sb.append("\n");
1855 }
1856 return sb.toString();
1857 }
1858
1859
1860
1861
1862
1863
1864
1865 @Reference(target = "(artifact=workflowdefinition)")
1866 protected void setProfilesReadyIndicator(ReadinessIndicator unused) { }
1867
1868
1869
1870
1871
1872
1873
1874 @Reference
1875 protected void setWorkspace(Workspace workspace) {
1876 this.workspace = workspace;
1877 }
1878
1879
1880
1881
1882
1883
1884
1885 @Reference
1886 protected void setServiceRegistry(ServiceRegistry registry) {
1887 this.serviceRegistry = registry;
1888 }
1889
1890 public ServiceRegistry getServiceRegistry() {
1891 return serviceRegistry;
1892 }
1893
1894
1895
1896
1897
1898
1899
1900 @Reference
1901 public void setSecurityService(SecurityService securityService) {
1902 this.securityService = securityService;
1903 }
1904
1905
1906
1907
1908
1909
1910
1911 @Reference
1912 public void setAuthorizationService(AuthorizationService authorizationService) {
1913 this.authorizationService = authorizationService;
1914 }
1915
1916
1917
1918
1919
1920
1921
1922 @Reference
1923 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1924 this.userDirectoryService = userDirectoryService;
1925 }
1926
1927
1928
1929
1930
1931
1932
1933 @Reference
1934 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
1935 this.organizationDirectoryService = organizationDirectory;
1936 }
1937
1938
1939
1940
1941
1942
1943
1944 @Reference
1945 public void setSeriesService(SeriesService seriesService) {
1946 this.seriesService = seriesService;
1947 }
1948
1949
1950
1951
1952
1953
1954
1955 @Reference
1956 public void setAssetManager(AssetManager assetManager) {
1957 this.assetManager = assetManager;
1958 }
1959
1960
1961
1962
1963
1964
1965
1966 @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC, unbind = "removeMetadataService")
1967 protected void addMetadataService(MediaPackageMetadataService service) {
1968 metadataServices.add(service);
1969 }
1970
1971
1972
1973
1974
1975
1976
1977 protected void removeMetadataService(MediaPackageMetadataService service) {
1978 metadataServices.remove(service);
1979 }
1980
1981
1982
1983
1984
1985
1986
1987 @Reference
1988 protected void addWorkflowDefinitionScanner(WorkflowDefinitionScanner scanner) {
1989 workflowDefinitionScanner = scanner;
1990 }
1991
1992
1993
1994
1995
1996
1997
1998 @Reference
1999 public void setIndex(ElasticsearchIndex index) {
2000 this.index = index;
2001 }
2002
2003
2004
2005
2006
2007
2008
2009 @Reference(name = "workflow-persistence")
2010 public void setPersistence(WorkflowServiceDatabase persistence) {
2011 this.persistence = persistence;
2012 }
2013
2014
2015
2016
2017
2018
2019 @Override
2020 public String getJobType() {
2021 return JOB_TYPE;
2022 }
2023
2024
2025
2026
2027 public static class HandlerRegistration {
2028
2029 protected WorkflowOperationHandler handler;
2030 protected String operationName;
2031
2032 public HandlerRegistration(String operationName, WorkflowOperationHandler handler) {
2033 if (operationName == null)
2034 throw new IllegalArgumentException("Operation name cannot be null");
2035 if (handler == null)
2036 throw new IllegalArgumentException("Handler cannot be null");
2037 this.operationName = operationName;
2038 this.handler = handler;
2039 }
2040
2041 public WorkflowOperationHandler getHandler() {
2042 return handler;
2043 }
2044
2045
2046
2047
2048
2049
2050 @Override
2051 public int hashCode() {
2052 final int prime = 31;
2053 int result = 1;
2054 result = prime * result + handler.hashCode();
2055 result = prime * result + operationName.hashCode();
2056 return result;
2057 }
2058
2059
2060
2061
2062
2063
2064 @Override
2065 public boolean equals(Object obj) {
2066 if (this == obj)
2067 return true;
2068 if (obj == null)
2069 return false;
2070 if (getClass() != obj.getClass())
2071 return false;
2072 HandlerRegistration other = (HandlerRegistration) obj;
2073 if (!handler.equals(other.handler))
2074 return false;
2075 return operationName.equals(other.operationName);
2076 }
2077 }
2078
2079
2080
2081
2082 class JobRunner implements Callable<Void> {
2083
2084
2085 private Job job = null;
2086
2087
2088 private final Job currentJob;
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098 JobRunner(Job job, Job currentJob) {
2099 this.job = job;
2100 this.currentJob = currentJob;
2101 }
2102
2103
2104
2105
2106
2107
2108 @Override
2109 public Void call() throws Exception {
2110 Organization jobOrganization = organizationDirectoryService.getOrganization(job.getOrganization());
2111 try {
2112 serviceRegistry.setCurrentJob(currentJob);
2113 securityService.setOrganization(jobOrganization);
2114 User jobUser = userDirectoryService.loadUser(job.getCreator());
2115 securityService.setUser(jobUser);
2116 process(job);
2117 } finally {
2118 serviceRegistry.setCurrentJob(null);
2119 securityService.setUser(null);
2120 securityService.setOrganization(null);
2121 }
2122 return null;
2123 }
2124 }
2125
2126 @Override
2127 public synchronized void cleanupWorkflowInstances(int buffer, WorkflowState state) throws UnauthorizedException,
2128 WorkflowDatabaseException {
2129 logger.info("Start cleaning up workflow instances older than {} days with status '{}'", buffer, state);
2130
2131 int instancesCleaned = 0;
2132 int cleaningFailed = 0;
2133
2134 Date priorTo = DateUtils.addDays(new Date(), -buffer);
2135
2136 try {
2137 for (WorkflowInstance workflowInstance : persistence.getWorkflowInstancesForCleanup(state, priorTo)) {
2138 try {
2139 logger.debug("Deleting workflow instance {}", workflowInstance.getId());
2140 remove(workflowInstance.getId());
2141 instancesCleaned++;
2142 } catch (WorkflowDatabaseException | UnauthorizedException e) {
2143 throw e;
2144 } catch (NotFoundException e) {
2145
2146 logger.debug("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2147 } catch (WorkflowParsingException | WorkflowStateException e) {
2148 logger.warn("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2149 cleaningFailed++;
2150 }
2151 }
2152 } catch (WorkflowDatabaseException e) {
2153 throw new WorkflowDatabaseException(e);
2154 }
2155
2156 if (instancesCleaned == 0 && cleaningFailed == 0) {
2157 logger.info("No workflow instances found to clean up");
2158 return;
2159 }
2160
2161 if (instancesCleaned > 0)
2162 logger.info("Cleaned up '{}' workflow instances", instancesCleaned);
2163 if (cleaningFailed > 0) {
2164 logger.warn("Cleaning failed for '{}' workflow instances", cleaningFailed);
2165 throw new WorkflowDatabaseException("Unable to clean all workflow instances, see logs!");
2166 }
2167 }
2168
2169 @Override
2170 public Map<String, Map<String, String>> getWorkflowStateMappings() {
2171 return workflowDefinitionScanner.workflowStateMappings.entrySet().stream().collect(Collectors.toMap(
2172 Entry::getKey, e -> e.getValue().stream()
2173 .collect(Collectors.toMap(m -> m.getState().name(), WorkflowStateMapping::getValue))
2174 ));
2175 }
2176
2177 @Override
2178 public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
2179 try {
2180 final int total;
2181 try {
2182 total = persistence.countMediaPackages();
2183 } catch (WorkflowDatabaseException e) {
2184 logIndexRebuildError(logger, e);
2185 throw new IndexRebuildException(getService(), e);
2186 }
2187
2188 if (total > 0) {
2189 logIndexRebuildBegin(logger, total, "workflows");
2190 int current = 0;
2191 int n = 20;
2192 List<WorkflowIndexData> workflowIndexData;
2193
2194 int limit = 1000;
2195 int offset = 0;
2196 String currentMediapackageId;
2197 String lastMediapackageId = "";
2198 var updatedWorkflowRange = new ArrayList<Event>();
2199 do {
2200 try {
2201 workflowIndexData = persistence.getWorkflowIndexData(limit, offset);
2202 } catch (WorkflowDatabaseException e) {
2203 logIndexRebuildError(logger, e);
2204 throw new IndexRebuildException(getService(), e);
2205 }
2206 if (!workflowIndexData.isEmpty()) {
2207 offset += limit;
2208 logger.debug("Got {} workflows for re-indexing", workflowIndexData.size());
2209
2210 for (WorkflowIndexData indexData : workflowIndexData) {
2211 currentMediapackageId = indexData.getMediaPackageId();
2212 if (currentMediapackageId.equals(lastMediapackageId)) {
2213 continue;
2214 }
2215 current++;
2216
2217
2218 if (!WorkflowUtil.isActive(WorkflowInstance.WorkflowState.values()[indexData.getState()].toString())
2219 || WorkflowState.PAUSED == WorkflowInstance.WorkflowState.values()[indexData.getState()]) {
2220 String orgid = indexData.getOrganizationId();
2221 if (null == orgid) {
2222 String mpId = indexData.getMediaPackageId();
2223
2224 RichAResult results = assetManager.getSnapshotsById(mpId);
2225 if (results.getSize() == 0) {
2226 logger.debug("Dropping {} from the index since it is missing from the database", mpId);
2227 continue;
2228 }
2229 orgid = results.getSnapshots().stream().findFirst().get().getOrganizationId();
2230
2231
2232 try {
2233
2234
2235 WorkflowInstance instance = persistence.getWorkflow(indexData.getId(), null);
2236 instance.setOrganizationId(orgid);
2237 persistence.updateInDatabase(instance);
2238 } catch (NotFoundException e) {
2239
2240 }
2241 }
2242 var updatedWorkflowData = index.getEvent(indexData.getMediaPackageId(), orgid, securityService.getUser());
2243 updatedWorkflowData = getStateUpdateFunction(indexData).apply(updatedWorkflowData);
2244 updatedWorkflowRange.add(updatedWorkflowData.get());
2245
2246 if (updatedWorkflowRange.size() >= n || current >= total) {
2247 index.bulkEventUpdate(updatedWorkflowRange);
2248 logIndexRebuildProgress(logger, total, current, n);
2249 updatedWorkflowRange.clear();
2250 }
2251 }
2252 else {
2253 logger.info("Skipping. Workflow {} is currently active.", indexData.getId());
2254 }
2255
2256 lastMediapackageId = currentMediapackageId;
2257 }
2258 }
2259 } while (!workflowIndexData.isEmpty());
2260 }
2261 } catch (Exception e) {
2262 logIndexRebuildError(logger, e);
2263 throw new IndexRebuildException(getService(), e);
2264 }
2265 }
2266
2267 @Override
2268 public IndexRebuildService.Service getService() {
2269 return IndexRebuildService.Service.Workflow;
2270 }
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280 private void removeWorkflowInstanceFromIndex(long workflowInstanceId) {
2281 final String orgId = securityService.getOrganization().getId();
2282 final User user = securityService.getUser();
2283
2284
2285 SearchResult<Event> results;
2286 try {
2287 results = index.getByQuery(new EventSearchQuery(orgId, user).withWorkflowId(workflowInstanceId));
2288 } catch (SearchIndexException e) {
2289 logger.error("Error retrieving the events for workflow instance {} from the {} index.", workflowInstanceId,
2290 index.getIndexName(), e);
2291 return;
2292 }
2293
2294 if (results.getItems().length == 0) {
2295 logger.warn("No events for workflow instance {} found in the {} index.", workflowInstanceId,
2296 index.getIndexName());
2297 return;
2298 }
2299
2300
2301 for (SearchResultItem<Event> item: results.getItems()) {
2302 String eventId = item.getSource().getIdentifier();
2303 logger.debug("Removing workflow instance {} of event {} from the {} index.", workflowInstanceId, eventId,
2304 index.getIndexName());
2305
2306 Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
2307 if (!eventOpt.isPresent()) {
2308 logger.warn("Event {} of workflow instance {} not found in the {} index.", workflowInstanceId,
2309 eventId, index.getIndexName());
2310 return Optional.empty();
2311 }
2312 Event event = eventOpt.get();
2313 if (event.getWorkflowId() != null && event.getWorkflowId().equals(workflowInstanceId)) {
2314 logger.debug("Workflow {} is the current workflow of event {}. Removing it from event.", eventId,
2315 workflowInstanceId);
2316 event.setWorkflowId(null);
2317 event.setWorkflowDefinitionId(null);
2318 event.setWorkflowState(null);
2319 return Optional.of(event);
2320 }
2321 return Optional.empty();
2322 };
2323
2324 try {
2325 index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
2326 logger.debug("Workflow instance {} of event {} removed from the {} index.", workflowInstanceId, eventId,
2327 index.getIndexName());
2328 } catch (SearchIndexException e) {
2329 logger.error("Error removing the workflow instance {} of event {} from the {} index.", workflowInstanceId,
2330 eventId, index.getIndexName(), e);
2331 }
2332 }
2333 }
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347 private void updateWorkflowInstanceInIndex(long id, int state, String wfDefId, String mpId, String orgId) {
2348 final WorkflowState workflowState = WorkflowState.values()[state];
2349 final User user = securityService.getUser();
2350
2351 logger.debug("Updating workflow instance {} of event {} in the {} index.", id, mpId,
2352 index.getIndexName());
2353 Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
2354 Event event = eventOpt.orElse(new Event(mpId, orgId));
2355 event.setWorkflowId(id);
2356 event.setWorkflowState(workflowState);
2357 event.setWorkflowDefinitionId(wfDefId);
2358 return Optional.of(event);
2359 };
2360
2361 try {
2362 index.addOrUpdateEvent(mpId, updateFunction, orgId, user);
2363 logger.debug("Workflow instance {} of event {} updated in the {} index.", id, mpId,
2364 index.getIndexName());
2365 } catch (SearchIndexException e) {
2366 logger.error("Error updating the workflow instance {} of event {} in the {} index.", id, mpId,
2367 index.getIndexName(), e);
2368 }
2369 }
2370
2371
2372
2373
2374
2375
2376
2377
2378 private Function<Optional<Event>, Optional<Event>> getStateUpdateFunction(WorkflowIndexData wfData) {
2379 return (Optional<Event> eventOpt) -> {
2380 Event event = eventOpt.orElse(new Event(wfData.getMediaPackageId(), wfData.getOrganizationId()));
2381 event.setWorkflowId(wfData.getId());
2382 event.setWorkflowState(WorkflowState.values()[wfData.getState()]);
2383 return Optional.of(event);
2384 };
2385 }
2386 }