1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.impl;
23
24 import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
25 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILED;
26 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILING;
27 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.INSTANTIATED;
28 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.PAUSED;
29 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.RUNNING;
30 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.STOPPED;
31 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.SUCCEEDED;
32
33 import org.opencastproject.assetmanager.api.AssetManager;
34 import org.opencastproject.assetmanager.api.Snapshot;
35 import org.opencastproject.assetmanager.util.WorkflowPropertiesUtil;
36 import org.opencastproject.elasticsearch.api.SearchIndexException;
37 import org.opencastproject.elasticsearch.api.SearchResult;
38 import org.opencastproject.elasticsearch.api.SearchResultItem;
39 import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
40 import org.opencastproject.elasticsearch.index.objects.event.Event;
41 import org.opencastproject.elasticsearch.index.objects.event.EventSearchQuery;
42 import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
43 import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
44 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
45 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
46 import org.opencastproject.job.api.Job;
47 import org.opencastproject.job.api.Job.Status;
48 import org.opencastproject.job.api.JobProducer;
49 import org.opencastproject.mediapackage.MediaPackage;
50 import org.opencastproject.mediapackage.MediaPackageElement;
51 import org.opencastproject.mediapackage.MediaPackageParser;
52 import org.opencastproject.mediapackage.MediaPackageSupport;
53 import org.opencastproject.mediapackage.Publication;
54 import org.opencastproject.metadata.api.MediaPackageMetadata;
55 import org.opencastproject.metadata.api.MediaPackageMetadataService;
56 import org.opencastproject.metadata.api.MetadataService;
57 import org.opencastproject.metadata.api.util.MediaPackageMetadataSupport;
58 import org.opencastproject.security.api.AccessControlList;
59 import org.opencastproject.security.api.AccessControlUtil;
60 import org.opencastproject.security.api.AclScope;
61 import org.opencastproject.security.api.AuthorizationService;
62 import org.opencastproject.security.api.Organization;
63 import org.opencastproject.security.api.OrganizationDirectoryService;
64 import org.opencastproject.security.api.Permissions;
65 import org.opencastproject.security.api.SecurityService;
66 import org.opencastproject.security.api.UnauthorizedException;
67 import org.opencastproject.security.api.User;
68 import org.opencastproject.security.api.UserDirectoryService;
69 import org.opencastproject.series.api.SeriesException;
70 import org.opencastproject.series.api.SeriesService;
71 import org.opencastproject.serviceregistry.api.ServiceRegistry;
72 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
73 import org.opencastproject.serviceregistry.api.UndispatchableJobException;
74 import org.opencastproject.util.NotFoundException;
75 import org.opencastproject.util.ReadinessIndicator;
76 import org.opencastproject.util.data.Tuple;
77 import org.opencastproject.workflow.api.ResumableWorkflowOperationHandler;
78 import org.opencastproject.workflow.api.RetryStrategy;
79 import org.opencastproject.workflow.api.WorkflowDatabaseException;
80 import org.opencastproject.workflow.api.WorkflowDefinition;
81 import org.opencastproject.workflow.api.WorkflowException;
82 import org.opencastproject.workflow.api.WorkflowIdentifier;
83 import org.opencastproject.workflow.api.WorkflowIndexData;
84 import org.opencastproject.workflow.api.WorkflowInstance;
85 import org.opencastproject.workflow.api.WorkflowInstance.WorkflowState;
86 import org.opencastproject.workflow.api.WorkflowListener;
87 import org.opencastproject.workflow.api.WorkflowOperationDefinition;
88 import org.opencastproject.workflow.api.WorkflowOperationDefinitionImpl;
89 import org.opencastproject.workflow.api.WorkflowOperationHandler;
90 import org.opencastproject.workflow.api.WorkflowOperationInstance;
91 import org.opencastproject.workflow.api.WorkflowOperationInstance.OperationState;
92 import org.opencastproject.workflow.api.WorkflowOperationResult;
93 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
94 import org.opencastproject.workflow.api.WorkflowOperationResultImpl;
95 import org.opencastproject.workflow.api.WorkflowParsingException;
96 import org.opencastproject.workflow.api.WorkflowService;
97 import org.opencastproject.workflow.api.WorkflowServiceDatabase;
98 import org.opencastproject.workflow.api.WorkflowStateException;
99 import org.opencastproject.workflow.api.WorkflowStateMapping;
100 import org.opencastproject.workflow.api.WorkflowUtil;
101 import org.opencastproject.workflow.api.XmlWorkflowParser;
102 import org.opencastproject.workspace.api.Workspace;
103
104 import com.google.common.util.concurrent.Striped;
105
106 import org.apache.commons.io.IOUtils;
107 import org.apache.commons.lang3.StringUtils;
108 import org.apache.commons.lang3.time.DateUtils;
109 import org.osgi.framework.InvalidSyntaxException;
110 import org.osgi.framework.ServiceReference;
111 import org.osgi.service.component.ComponentContext;
112 import org.osgi.service.component.annotations.Activate;
113 import org.osgi.service.component.annotations.Component;
114 import org.osgi.service.component.annotations.Reference;
115 import org.osgi.service.component.annotations.ReferenceCardinality;
116 import org.osgi.service.component.annotations.ReferencePolicy;
117 import org.slf4j.Logger;
118 import org.slf4j.LoggerFactory;
119
120 import java.io.ByteArrayOutputStream;
121 import java.io.IOException;
122 import java.nio.charset.StandardCharsets;
123 import java.util.ArrayList;
124 import java.util.Collections;
125 import java.util.Comparator;
126 import java.util.Date;
127 import java.util.HashMap;
128 import java.util.HashSet;
129 import java.util.List;
130 import java.util.Map;
131 import java.util.Map.Entry;
132 import java.util.Optional;
133 import java.util.Properties;
134 import java.util.Set;
135 import java.util.SortedSet;
136 import java.util.TreeSet;
137 import java.util.concurrent.Callable;
138 import java.util.concurrent.CopyOnWriteArrayList;
139 import java.util.concurrent.Executors;
140 import java.util.concurrent.ThreadPoolExecutor;
141 import java.util.concurrent.locks.Lock;
142 import java.util.function.Function;
143 import java.util.stream.Collectors;
144
145
146
147
148
149
150
151
152 @Component(
153 property = {
154 "service.description=Workflow Service",
155 "service.pid=org.opencastproject.workflow.impl.WorkflowServiceImpl"
156 },
157 immediate = true,
158 service = { WorkflowService.class, WorkflowServiceImpl.class, IndexProducer.class }
159 )
160 public class WorkflowServiceImpl extends AbstractIndexProducer implements WorkflowService, JobProducer {
161
162
163 private static final String RETRY_STRATEGY = "retryStrategy";
164
165
166 private static final Logger logger = LoggerFactory.getLogger(WorkflowServiceImpl.class);
167
168
169 enum Operation {
170 START_WORKFLOW, RESUME, START_OPERATION
171 }
172
173
174 private static final String NULL_PARENT_ID = "-";
175
176
177
178
179
180 private static final float WORKFLOW_JOB_LOAD = 0.0f;
181
182
183 public static final String ERROR_RESOLUTION_HANDLER_ID = "error-resolution";
184
185
186 protected ComponentContext componentContext = null;
187
188
189 private SortedSet<MediaPackageMetadataService> metadataServices;
190
191
192 protected WorkflowServiceDatabase persistence;
193
194
195 private final List<WorkflowListener> listeners = new CopyOnWriteArrayList<WorkflowListener>();
196
197
198 protected ThreadPoolExecutor executorService;
199
200
201 protected Workspace workspace = null;
202
203
204 protected ServiceRegistry serviceRegistry = null;
205
206
207 protected SecurityService securityService = null;
208
209
210 protected AuthorizationService authorizationService = null;
211
212
213 protected UserDirectoryService userDirectoryService = null;
214
215
216 protected OrganizationDirectoryService organizationDirectoryService = null;
217
218
219 protected SeriesService seriesService;
220
221
222 protected AssetManager assetManager = null;
223
224
225 private WorkflowDefinitionScanner workflowDefinitionScanner;
226
227
228 private final List<Long> delayedWorkflows = new ArrayList<Long>();
229
230
231 private final Striped<Lock> lock = Striped.lazyWeakLock(1024);
232 private final Striped<Lock> updateLock = Striped.lazyWeakLock(1024);
233 private final Striped<Lock> mediaPackageLocks = Striped.lazyWeakLock(1024);
234
235
236 private ElasticsearchIndex index;
237
238
239
240
241 public WorkflowServiceImpl() {
242 metadataServices = new TreeSet<>(Comparator.comparingInt(MetadataService::getPriority));
243 }
244
245
246
247
248
249
250
251 @Activate
252 public void activate(ComponentContext componentContext) {
253 this.componentContext = componentContext;
254 executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();
255 logger.info("Activate Workflow service");
256 }
257
258
259
260
261
262
263
264 @Override
265 public void addWorkflowListener(WorkflowListener listener) {
266 listeners.add(listener);
267 }
268
269
270
271
272
273
274
275 @Override
276 public void removeWorkflowListener(WorkflowListener listener) {
277 listeners.remove(listener);
278 }
279
280
281
282
283 protected void fireListeners(final WorkflowInstance oldWorkflowInstance, final WorkflowInstance newWorkflowInstance) {
284 final User currentUser = securityService.getUser();
285 final Organization currentOrganization = securityService.getOrganization();
286 for (final WorkflowListener listener : listeners) {
287 if (oldWorkflowInstance == null || !oldWorkflowInstance.getState().equals(newWorkflowInstance.getState())) {
288 Runnable runnable = () -> {
289 try {
290 securityService.setUser(currentUser);
291 securityService.setOrganization(currentOrganization);
292 listener.stateChanged(newWorkflowInstance);
293 } finally {
294 securityService.setUser(null);
295 securityService.setOrganization(null);
296 }
297 };
298 executorService.execute(runnable);
299 } else {
300 logger.debug("Not notifying {} because the workflow state has not changed", listener);
301 }
302
303 if (newWorkflowInstance.getCurrentOperation() != null) {
304 if (oldWorkflowInstance == null || oldWorkflowInstance.getCurrentOperation() == null
305 || !oldWorkflowInstance.getCurrentOperation().equals(newWorkflowInstance.getCurrentOperation())) {
306 Runnable runnable = () -> {
307 try {
308 securityService.setUser(currentUser);
309 securityService.setOrganization(currentOrganization);
310 listener.operationChanged(newWorkflowInstance);
311 } finally {
312 securityService.setUser(null);
313 securityService.setOrganization(null);
314 }
315 };
316 executorService.execute(runnable);
317 }
318 } else {
319 logger.debug("Not notifying {} because the workflow operation has not changed", listener);
320 }
321 }
322 }
323
324
325
326
327
328
329 @Override
330 public List<WorkflowDefinition> listAvailableWorkflowDefinitions() {
331 return workflowDefinitionScanner
332 .getAvailableWorkflowDefinitions(securityService.getOrganization(), securityService.getUser())
333 .sorted()
334 .collect(Collectors.toList());
335 }
336
337
338
339
340 public boolean isRunnable(WorkflowDefinition workflowDefinition) {
341 List<String> availableOperations = listAvailableOperationNames();
342 List<WorkflowDefinition> checkedWorkflows = new ArrayList<>();
343 boolean runnable = isRunnable(workflowDefinition, availableOperations, checkedWorkflows);
344 int wfCount = checkedWorkflows.size() - 1;
345 if (runnable) {
346 logger.info("Workflow {}, containing {} derived workflows, is runnable", workflowDefinition, wfCount);
347 } else {
348 logger.warn("Workflow {}, containing {} derived workflows, is not runnable", workflowDefinition, wfCount);
349 }
350 return runnable;
351 }
352
353
354
355
356
357
358
359
360
361
362
363
364
365 private boolean isRunnable(WorkflowDefinition workflowDefinition, List<String> availableOperations,
366 List<WorkflowDefinition> checkedWorkflows) {
367 if (checkedWorkflows.contains(workflowDefinition)) {
368 return true;
369 }
370
371
372 for (WorkflowOperationDefinition op : workflowDefinition.getOperations()) {
373 if (!availableOperations.contains(op.getId())) {
374 logger.info("{} is not runnable due to missing operation {}", workflowDefinition, op);
375 return false;
376 }
377 String catchWorkflow = op.getExceptionHandlingWorkflow();
378 if (catchWorkflow != null) {
379 WorkflowDefinition catchWorkflowDefinition;
380 try {
381 catchWorkflowDefinition = getWorkflowDefinitionById(catchWorkflow);
382 } catch (NotFoundException e) {
383 logger.info("{} is not runnable due to missing catch workflow {} on operation {}", workflowDefinition,
384 catchWorkflow, op);
385 return false;
386 }
387 if (!isRunnable(catchWorkflowDefinition, availableOperations, checkedWorkflows)) {
388 return false;
389 }
390 }
391 }
392
393
394 if (!checkedWorkflows.contains(workflowDefinition)) {
395 checkedWorkflows.add(workflowDefinition);
396 }
397 return true;
398 }
399
400
401
402
403
404
405 public Set<HandlerRegistration> getRegisteredHandlers() {
406 Set<HandlerRegistration> set = new HashSet<>();
407 ServiceReference[] refs;
408 try {
409 refs = componentContext.getBundleContext().getServiceReferences(WorkflowOperationHandler.class.getName(), null);
410 } catch (InvalidSyntaxException e) {
411 throw new IllegalStateException(e);
412 }
413 if (refs != null) {
414 for (ServiceReference ref : refs) {
415 WorkflowOperationHandler handler = (WorkflowOperationHandler) componentContext.getBundleContext().getService(
416 ref);
417 set.add(new HandlerRegistration((String) ref.getProperty(WORKFLOW_OPERATION_PROPERTY), handler));
418 }
419 } else {
420 logger.warn("No registered workflow operation handlers found");
421 }
422 return set;
423 }
424
425 protected WorkflowOperationHandler getWorkflowOperationHandler(String operationId) {
426 for (HandlerRegistration reg : getRegisteredHandlers()) {
427 if (reg.operationName.equals(operationId)) {
428 return reg.handler;
429 }
430 }
431 return null;
432 }
433
434
435
436
437
438
439
440 protected List<String> listAvailableOperationNames() {
441 return getRegisteredHandlers().parallelStream().map(op -> op.operationName).collect(Collectors.toList());
442 }
443
444
445
446
447
448
449 @Override
450 public WorkflowInstance getWorkflowById(long id) throws NotFoundException,
451 UnauthorizedException {
452 try {
453 WorkflowInstance workflow = persistence.getWorkflow(id);
454 assertPermission(workflow, Permissions.Action.READ.toString(), workflow.getOrganizationId());
455 return workflow;
456 } catch (WorkflowDatabaseException e) {
457 throw new IllegalStateException("Got not get workflow from database with id ");
458 }
459 }
460
461
462
463
464
465
466
467 @Override
468 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage)
469 throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
470 return start(workflowDefinition, mediaPackage, new HashMap<>());
471 }
472
473
474
475
476
477
478
479 @Override
480 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage,
481 Map<String, String> properties)
482 throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
483 try {
484 return start(workflowDefinition, mediaPackage, null, properties);
485 } catch (NotFoundException e) {
486
487 throw new IllegalStateException("a null workflow ID caused a NotFoundException. This is a programming error.");
488 }
489 }
490
491
492
493
494
495
496
497 @Override
498 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage sourceMediaPackage,
499 Long parentWorkflowId, Map<String, String> originalProperties) throws WorkflowDatabaseException,
500 NotFoundException, UnauthorizedException, WorkflowParsingException, IllegalStateException {
501 final String mediaPackageId = sourceMediaPackage.getIdentifier().toString();
502 Map<String, String> properties = null;
503
504
505
506 populateMediaPackageMetadata(sourceMediaPackage);
507
508 if (originalProperties != null) {
509 WorkflowPropertiesUtil.storeProperties(assetManager, sourceMediaPackage, originalProperties);
510 properties = WorkflowPropertiesUtil.getLatestWorkflowProperties(assetManager, mediaPackageId);
511 }
512
513
514 final Lock lock = mediaPackageLocks.get(mediaPackageId);
515 lock.lock();
516
517 try {
518 if (workflowDefinition == null) {
519 throw new IllegalArgumentException("workflow definition must not be null");
520 }
521 Optional<List<String>> errors = MediaPackageSupport.sanityCheck(sourceMediaPackage);
522 if (errors.isPresent()) {
523 throw new IllegalArgumentException("Insane media package cannot be processed: "
524 + String.join("; ", errors.get()));
525 }
526 if (parentWorkflowId != null) {
527 try {
528 getWorkflowById(parentWorkflowId);
529 } catch (UnauthorizedException e) {
530 throw new IllegalArgumentException("Parent workflow " + parentWorkflowId + " not visible to this user");
531 }
532 } else {
533 if (persistence.mediaPackageHasActiveWorkflows(mediaPackageId)) {
534 throw new IllegalStateException(String.format(
535 "Can't start workflow '%s' for media package '%s' because another workflow is currently active.",
536 workflowDefinition.getTitle(),
537 sourceMediaPackage.getIdentifier().toString()));
538 }
539 }
540
541
542 User currentUser = securityService.getUser();
543 validUserOrThrow(currentUser);
544
545
546 Organization organization = securityService.getOrganization();
547 if (organization == null) {
548 throw new SecurityException("Current organization is unknown");
549 }
550
551 WorkflowInstance workflowInstance = new WorkflowInstance(workflowDefinition, sourceMediaPackage,
552 currentUser, organization, properties);
553 workflowInstance = updateConfiguration(workflowInstance, properties);
554
555
556 try {
557
558 String workflowDefinitionXml = XmlWorkflowParser.toXml(workflowDefinition);
559 String mediaPackageXml = MediaPackageParser.getAsXml(sourceMediaPackage);
560
561 List<String> arguments = new ArrayList<>();
562 arguments.add(workflowDefinitionXml);
563 arguments.add(mediaPackageXml);
564 if (parentWorkflowId != null || properties != null) {
565 String parentWorkflowIdString = (parentWorkflowId != null) ? parentWorkflowId.toString() : NULL_PARENT_ID;
566 arguments.add(parentWorkflowIdString);
567 }
568 if (properties != null) {
569 arguments.add(mapToString(properties));
570 }
571
572 Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_WORKFLOW.toString(), arguments,
573 null, false, null, WORKFLOW_JOB_LOAD);
574
575
576 workflowInstance.setId(job.getId());
577
578
579
580 update(workflowInstance);
581
582 return workflowInstance;
583 } catch (Throwable t) {
584 try {
585 workflowInstance.setState(FAILED);
586 update(workflowInstance);
587 } catch (Exception failureToFail) {
588 logger.warn("Unable to update workflow to failed state", failureToFail);
589 }
590 try {
591 throw t;
592 } catch (ServiceRegistryException e) {
593 throw new WorkflowDatabaseException(e);
594 }
595 }
596 } finally {
597 lock.unlock();
598 }
599 }
600
601 protected WorkflowInstance updateConfiguration(WorkflowInstance instance, Map<String, String> properties) {
602 if (properties != null) {
603 for (Entry<String, String> entry : properties.entrySet()) {
604 instance.setConfiguration(entry.getKey(), entry.getValue());
605 }
606 }
607 return instance;
608 }
609
610
611
612
613
614
615
616
617 protected WorkflowOperationHandler selectOperationHandler(WorkflowOperationInstance operation) {
618 List<WorkflowOperationHandler> handlerList = new ArrayList<>();
619 for (HandlerRegistration handlerReg : getRegisteredHandlers()) {
620 if (handlerReg.operationName != null && handlerReg.operationName.equals(operation.getTemplate())) {
621 handlerList.add(handlerReg.handler);
622 }
623 }
624 if (handlerList.size() > 1) {
625 throw new IllegalStateException("Multiple operation handlers found for operation '" + operation.getTemplate()
626 + "'");
627 } else if (handlerList.size() == 1) {
628 return handlerList.get(0);
629 }
630 logger.warn("No workflow operation handlers found for operation '{}'", operation.getTemplate());
631 return null;
632 }
633
634
635
636
637
638
639
640
641
642
643 protected Job runWorkflow(WorkflowInstance workflow) throws WorkflowException, UnauthorizedException {
644 if (INSTANTIATED != workflow.getState()) {
645
646
647
648
649 if (RUNNING == workflow.getState()) {
650 WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
651 if (currentOperation != null) {
652 if (currentOperation.getId() != null) {
653 try {
654 Job operationJob = serviceRegistry.getJob(currentOperation.getId());
655 if (Job.Status.RUNNING.equals(operationJob.getStatus())) {
656 logger.debug("Not starting workflow {}, it is already in running state", workflow);
657 return null;
658 } else {
659 logger.info("Scheduling next operation of workflow {}", workflow);
660 operationJob.setStatus(Status.QUEUED);
661 operationJob.setDispatchable(true);
662 return serviceRegistry.updateJob(operationJob);
663 }
664 } catch (Exception e) {
665 logger.warn("Error determining status of current workflow operation in {}", workflow, e);
666 return null;
667 }
668 }
669 } else {
670 throw new IllegalStateException("Cannot start a workflow '" + workflow + "' with no current operation");
671 }
672 } else {
673 throw new IllegalStateException("Cannot start a workflow in state '" + workflow.getState() + "'");
674 }
675 }
676
677
678 workflow.setState(RUNNING);
679 update(workflow);
680
681 WorkflowOperationInstance operation = workflow.getCurrentOperation();
682
683 if (operation == null) {
684 throw new IllegalStateException("Cannot start a workflow without a current operation");
685 }
686
687 if (!operation.equals(workflow.getOperations().get(0))) {
688 throw new IllegalStateException("Current operation expected to be first");
689 }
690
691 try {
692 logger.info("Scheduling workflow {} for execution", workflow.getId());
693 Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
694 Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
695 operation.setId(job.getId());
696 update(workflow);
697 job.setStatus(Status.QUEUED);
698 job.setDispatchable(true);
699 return serviceRegistry.updateJob(job);
700 } catch (ServiceRegistryException e) {
701 throw new WorkflowDatabaseException(e);
702 } catch (NotFoundException e) {
703
704 throw new IllegalStateException("Unable to find a job that was just created");
705 }
706
707 }
708
709
710
711
712
713
714
715
716
717
718
719
720
721 protected WorkflowOperationInstance runWorkflowOperation(WorkflowInstance workflow, Map<String, String> properties)
722 throws WorkflowException, UnauthorizedException {
723 WorkflowOperationInstance processingOperation = workflow.getCurrentOperation();
724 if (processingOperation == null) {
725 throw new IllegalStateException("Workflow '" + workflow + "' has no operation to run");
726 }
727
728
729 WorkflowState initialState = workflow.getState();
730
731
732 WorkflowOperationHandler operationHandler = selectOperationHandler(processingOperation);
733 WorkflowOperationWorker worker = new WorkflowOperationWorker(operationHandler, workflow, properties, this);
734 workflow = worker.execute();
735
736 Long currentOperationJobId = processingOperation.getId();
737 try {
738 updateOperationJob(currentOperationJobId, processingOperation.getState());
739 } catch (NotFoundException e) {
740 throw new IllegalStateException("Unable to find a job that has already been running");
741 } catch (ServiceRegistryException e) {
742 throw new WorkflowDatabaseException(e);
743 }
744
745
746 WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
747
748
749 if (currentOperation == null) {
750
751
752 if (FAILING.equals(workflow.getState())) {
753 workflow.setState(FAILED);
754 }
755
756
757
758 else if (!FAILED.equals(workflow.getState())) {
759 workflow.setState(SUCCEEDED);
760 for (WorkflowOperationInstance op : workflow.getOperations()) {
761 if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
762 if (op.isFailOnError()) {
763 workflow.setState(FAILED);
764 break;
765 }
766 }
767 }
768 }
769
770
771 logger.debug("{} has {}", workflow, workflow.getState());
772 update(workflow);
773
774 } else {
775
776
777 WorkflowState dbWorkflowState;
778 try {
779 dbWorkflowState = getWorkflowById(workflow.getId()).getState();
780 } catch (NotFoundException e) {
781 throw new IllegalStateException("The workflow with ID " + workflow.getId()
782 + " can not be found in the database", e);
783 } catch (UnauthorizedException e) {
784 throw new IllegalStateException("The workflow with ID " + workflow.getId() + " can not be read", e);
785 }
786
787
788 if (!dbWorkflowState.equals(initialState)) {
789 logger.info("Workflow state for {} was changed to '{}' from the outside", workflow, dbWorkflowState);
790 workflow.setState(dbWorkflowState);
791 }
792
793
794
795 Job job;
796 switch (workflow.getState()) {
797 case FAILED:
798 update(workflow);
799 break;
800 case FAILING:
801 case RUNNING:
802 try {
803 job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
804 Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
805 currentOperation.setId(job.getId());
806 update(workflow);
807 job.setStatus(Status.QUEUED);
808 job.setDispatchable(true);
809 serviceRegistry.updateJob(job);
810 } catch (ServiceRegistryException e) {
811 throw new WorkflowDatabaseException(e);
812 } catch (NotFoundException e) {
813
814 throw new IllegalStateException("Unable to find a job that was just created");
815 }
816 break;
817 case PAUSED:
818 case STOPPED:
819 case SUCCEEDED:
820 update(workflow);
821 break;
822 case INSTANTIATED:
823 update(workflow);
824 throw new IllegalStateException("Impossible workflow state found during processing");
825 default:
826 throw new IllegalStateException("Unknown workflow state found during processing");
827 }
828
829 }
830
831 return processingOperation;
832 }
833
834
835
836
837
838
839 @Override
840 public WorkflowDefinition getWorkflowDefinitionById(String id) throws NotFoundException {
841 final WorkflowIdentifier workflowIdentifier = new WorkflowIdentifier(id, securityService.getOrganization().getId());
842 final WorkflowDefinition def = workflowDefinitionScanner
843 .getWorkflowDefinition(securityService.getUser(), workflowIdentifier);
844 if (def == null) {
845 throw new NotFoundException("Workflow definition '" + workflowIdentifier + "' not found or inaccessible");
846 }
847 return def;
848 }
849
850
851
852
853
854
855 @Override
856 public WorkflowInstance stop(long workflowInstanceId) throws WorkflowException, NotFoundException,
857 UnauthorizedException {
858 final Lock lock = this.lock.get(workflowInstanceId);
859 lock.lock();
860 try {
861 WorkflowInstance instance = getWorkflowById(workflowInstanceId);
862
863 if (instance.getState() != STOPPED) {
864
865 instance.setState(STOPPED);
866 update(instance);
867 }
868
869 try {
870 removeTempFiles(instance);
871 } catch (Exception e) {
872 logger.warn("Cannot remove temp files for workflow instance {}", workflowInstanceId, e);
873 }
874
875 return instance;
876 } finally {
877 lock.unlock();
878 }
879 }
880
881
882
883
884 private void validUserOrThrow(User user) {
885 if (user == null) {
886 throw new SecurityException("Current user is unknown");
887 }
888
889 if (userDirectoryService.loadUser(user.getUsername()) == null) {
890 throw new SecurityException(String.format("Current user '%s' can not be loaded", user.getUsername()));
891 }
892 }
893
894 private void removeTempFiles(WorkflowInstance workflowInstance) {
895 logger.info("Removing temporary files for workflow {}", workflowInstance.getId());
896 MediaPackage mp = workflowInstance.getMediaPackage();
897 if (null == mp) {
898 logger.warn("Workflow instance {} does not have an media package set", workflowInstance.getId());
899 return;
900 }
901 for (MediaPackageElement elem : mp.getElements()) {
902
903 if (elem instanceof Publication) {
904 continue;
905 }
906 if (null == elem.getURI()) {
907 logger.warn("Media package element {} from the media package {} does not have an URI set",
908 elem.getIdentifier(), mp.getIdentifier());
909 continue;
910 }
911 try {
912 logger.debug("Removing temporary file {} for workflow {}", elem.getURI(), workflowInstance);
913 workspace.delete(elem.getURI());
914 } catch (IOException e) {
915 logger.warn("Unable to delete mediapackage element", e);
916 } catch (NotFoundException e) {
917
918 }
919 }
920 }
921
922
923
924
925
926
927
928 @Override
929 public void remove(long workflowInstanceId) throws WorkflowDatabaseException, NotFoundException,
930 UnauthorizedException, WorkflowParsingException, WorkflowStateException {
931 remove(workflowInstanceId, false);
932 }
933
934
935
936
937
938
939 @Override
940 public void remove(long workflowInstanceId, boolean force) throws WorkflowDatabaseException, NotFoundException,
941 UnauthorizedException, WorkflowStateException {
942 final Lock lock = this.lock.get(workflowInstanceId);
943 lock.lock();
944 try {
945 WorkflowInstance instance = getWorkflowById(workflowInstanceId);
946 WorkflowInstance.WorkflowState state = instance.getState();
947 if (!state.isTerminated() && !force) {
948 throw new WorkflowStateException("Workflow instance with state '" + state + "' cannot be removed "
949 + "since it is not yet terminated.");
950 }
951
952 assertPermission(instance, Permissions.Action.WRITE.toString(), instance.getOrganizationId());
953
954
955 removeTempFiles(instance);
956
957
958 List<WorkflowOperationInstance> operations = instance.getOperations();
959 List<Long> jobsToDelete = new ArrayList<>();
960 for (WorkflowOperationInstance op : operations) {
961 if (op.getId() != null) {
962 long workflowOpId = op.getId();
963 if (workflowOpId != workflowInstanceId) {
964 jobsToDelete.add(workflowOpId);
965 }
966 }
967 }
968 try {
969 serviceRegistry.removeJobs(jobsToDelete);
970 } catch (ServiceRegistryException e) {
971 logger.warn("Problems while removing jobs related to workflow operations '{}'", jobsToDelete, e);
972 } catch (NotFoundException e) {
973 logger.debug("No jobs related to one of the workflow operation '{}' found in service registry", jobsToDelete);
974 }
975
976
977 try {
978 serviceRegistry.removeJobs(Collections.singletonList(workflowInstanceId));
979 removeWorkflowInstanceFromIndex(instance.getId());
980 } catch (ServiceRegistryException e) {
981 logger.warn("Problems while removing workflow instance job '{}'", workflowInstanceId, e);
982 } catch (NotFoundException e) {
983 logger.info("No workflow instance job '{}' found in the service registry", workflowInstanceId);
984 }
985
986
987 persistence.removeFromDatabase(instance);
988 } finally {
989 lock.unlock();
990 }
991 }
992
993
994
995
996
997
998 @Override
999 public WorkflowInstance suspend(long workflowInstanceId) throws WorkflowException, NotFoundException,
1000 UnauthorizedException {
1001 final Lock lock = this.lock.get(workflowInstanceId);
1002 lock.lock();
1003 try {
1004 WorkflowInstance instance = getWorkflowById(workflowInstanceId);
1005 instance.setState(PAUSED);
1006 update(instance);
1007 return instance;
1008 } finally {
1009 lock.unlock();
1010 }
1011 }
1012
1013
1014
1015
1016
1017
1018 @Override
1019 public WorkflowInstance resume(long id) throws WorkflowException, NotFoundException, IllegalStateException,
1020 UnauthorizedException {
1021 return resume(id, null);
1022 }
1023
1024
1025
1026
1027
1028
1029 @Override
1030 public WorkflowInstance resume(long workflowInstanceId, Map<String, String> properties) throws WorkflowException,
1031 NotFoundException, IllegalStateException, UnauthorizedException {
1032 WorkflowInstance workflowInstance = getWorkflowById(workflowInstanceId);
1033 if (!WorkflowState.PAUSED.equals(workflowInstance.getState())) {
1034 throw new IllegalStateException("Can not resume a workflow where the current state is not in paused");
1035 }
1036
1037 workflowInstance = updateConfiguration(workflowInstance, properties);
1038 update(workflowInstance);
1039
1040 WorkflowOperationInstance currentOperation = workflowInstance.getCurrentOperation();
1041
1042
1043 if (currentOperation == null) {
1044
1045
1046 workflowInstance.setState(SUCCEEDED);
1047 for (WorkflowOperationInstance op : workflowInstance.getOperations()) {
1048 if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
1049 if (op.isFailOnError()) {
1050 workflowInstance.setState(FAILED);
1051 break;
1052 }
1053 }
1054 }
1055
1056
1057 logger.debug("{} has {}", workflowInstance, workflowInstance.getState());
1058 update(workflowInstance);
1059 return workflowInstance;
1060 }
1061
1062
1063
1064 if (OperationState.INSTANTIATED.equals(currentOperation.getState())) {
1065 try {
1066
1067 Job operationJob = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
1068 Collections.singletonList(Long.toString(workflowInstanceId)), null, false, null, WORKFLOW_JOB_LOAD);
1069
1070
1071
1072 workflowInstance.setState(RUNNING);
1073 currentOperation.setId(operationJob.getId());
1074
1075
1076 update(workflowInstance);
1077
1078
1079 operationJob.setStatus(Status.QUEUED);
1080 operationJob.setDispatchable(true);
1081 serviceRegistry.updateJob(operationJob);
1082
1083 return workflowInstance;
1084 } catch (ServiceRegistryException e) {
1085 throw new WorkflowDatabaseException(e);
1086 }
1087 }
1088
1089 Long operationJobId = workflowInstance.getCurrentOperation().getId();
1090 if (operationJobId == null) {
1091 throw new IllegalStateException("Can not resume a workflow where the current operation has no associated id");
1092 }
1093
1094
1095 Job workflowJob;
1096 try {
1097 workflowJob = serviceRegistry.getJob(workflowInstanceId);
1098 workflowJob.setStatus(Status.RUNNING);
1099 persistence.updateInDatabase(workflowInstance);
1100 serviceRegistry.updateJob(workflowJob);
1101
1102 Job operationJob = serviceRegistry.getJob(operationJobId);
1103 operationJob.setStatus(Status.QUEUED);
1104 operationJob.setDispatchable(true);
1105 if (properties != null) {
1106 Properties props = new Properties();
1107 props.putAll(properties);
1108 ByteArrayOutputStream out = new ByteArrayOutputStream();
1109 props.store(out, null);
1110 List<String> newArguments = new ArrayList<String>(operationJob.getArguments());
1111 newArguments.add(new String(out.toByteArray(), StandardCharsets.UTF_8));
1112 operationJob.setArguments(newArguments);
1113 }
1114 serviceRegistry.updateJob(operationJob);
1115 } catch (ServiceRegistryException e) {
1116 throw new WorkflowDatabaseException(e);
1117 } catch (IOException e) {
1118 throw new WorkflowParsingException("Unable to parse workflow and/or workflow properties");
1119 }
1120
1121 return workflowInstance;
1122 }
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134 protected void assertPermission(WorkflowInstance workflow, String action, String workflowOrgId)
1135 throws UnauthorizedException {
1136 User currentUser = securityService.getUser();
1137 Organization currentOrg = securityService.getOrganization();
1138 String currentOrgAdminRole = currentOrg.getAdminRole();
1139 String currentOrgId = currentOrg.getId();
1140
1141 MediaPackage mediapackage = workflow.getMediaPackage();
1142
1143 WorkflowState state = workflow.getState();
1144 if (state != INSTANTIATED && state != RUNNING && workflow.getState() != FAILING) {
1145 Optional<MediaPackage> assetMediapackage = assetManager.getMediaPackage(mediapackage.getIdentifier().toString());
1146 if (assetMediapackage.isPresent()) {
1147 mediapackage = assetMediapackage.get();
1148 }
1149 }
1150
1151 var creatorName = workflow.getCreatorName();
1152 var workflowCreator = creatorName == null ? null : userDirectoryService.loadUser(creatorName);
1153 boolean authorized = currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1154 || (currentUser.hasRole(currentOrgAdminRole) && currentOrgId.equals(workflowOrgId))
1155 || (currentUser.equals(workflowCreator))
1156 || (authorizationService.hasPermission(mediapackage, action) && currentOrgId.equals(workflowOrgId));
1157
1158 if (!authorized) {
1159 throw new UnauthorizedException(currentUser, action);
1160 }
1161 }
1162
1163 protected boolean assertMediaPackagePermission(String mediaPackageId, String action) {
1164 var currentUser = securityService.getUser();
1165 Optional<MediaPackage> mp = assetManager.getMediaPackage(mediaPackageId);
1166
1167
1168
1169
1170 return currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1171 || mp.isPresent() && currentUser.hasRole(securityService.getOrganization().getAdminRole())
1172 || mp.isPresent() && authorizationService.hasPermission(mp.get(), action);
1173 }
1174
1175
1176
1177
1178
1179
1180 @Override
1181 public void update(final WorkflowInstance workflowInstance) throws WorkflowDatabaseException, UnauthorizedException {
1182 final Lock lock = updateLock.get(workflowInstance.getId());
1183 lock.lock();
1184
1185 try {
1186 WorkflowInstance originalWorkflowInstance = null;
1187 try {
1188
1189 originalWorkflowInstance = getWorkflowById(workflowInstance.getId());
1190 } catch (NotFoundException e) {
1191
1192 }
1193
1194 MediaPackage updatedMediaPackage = null;
1195 try {
1196
1197
1198 updatedMediaPackage = workflowInstance.getMediaPackage();
1199
1200 populateMediaPackageMetadata(updatedMediaPackage);
1201
1202 String seriesId = updatedMediaPackage.getSeries();
1203 if (seriesId != null && workflowInstance.getCurrentOperation() != null) {
1204
1205
1206
1207 try {
1208 AccessControlList acl = seriesService.getSeriesAccessControl(seriesId);
1209 Tuple<AccessControlList, AclScope> activeAcl = authorizationService.getAcl(
1210 updatedMediaPackage, AclScope.Series);
1211
1212 if (!AclScope.Series.equals(activeAcl.getB()) || !AccessControlUtil.equals(activeAcl.getA(), acl)) {
1213 authorizationService.setAcl(updatedMediaPackage, AclScope.Series, acl);
1214 }
1215 } catch (NotFoundException e) {
1216 logger.debug("Not updating series ACL on event {} since series {} has no ACL set",
1217 updatedMediaPackage, seriesId, e);
1218 }
1219 }
1220
1221 workflowInstance.setMediaPackage(updatedMediaPackage);
1222 } catch (SeriesException e) {
1223 throw new WorkflowDatabaseException(e);
1224 } catch (Exception e) {
1225 logger.error("Metadata for media package {} could not be updated", updatedMediaPackage, e);
1226 }
1227
1228
1229 WorkflowState workflowState = workflowInstance.getState();
1230
1231 Job job;
1232 try {
1233 job = serviceRegistry.getJob(workflowInstance.getId());
1234 job.setPayload(Long.toString(workflowInstance.getId()));
1235
1236
1237 switch (workflowState) {
1238 case FAILED:
1239 job.setStatus(Status.FAILED);
1240 break;
1241 case FAILING:
1242 break;
1243 case INSTANTIATED:
1244 job.setDispatchable(true);
1245 job.setStatus(Status.QUEUED);
1246 break;
1247 case PAUSED:
1248 job.setStatus(Status.PAUSED);
1249 break;
1250 case RUNNING:
1251 job.setStatus(Status.RUNNING);
1252 break;
1253 case STOPPED:
1254 job.setStatus(Status.CANCELLED);
1255 break;
1256 case SUCCEEDED:
1257 job.setStatus(Status.FINISHED);
1258 break;
1259 default:
1260 throw new IllegalStateException("Found a workflow state that is not handled");
1261 }
1262 } catch (ServiceRegistryException e) {
1263 throw new WorkflowDatabaseException(
1264 "Unable to read workflow job " + workflowInstance.getId() + " from service registry", e);
1265 } catch (NotFoundException e) {
1266 throw new WorkflowDatabaseException(
1267 "Job for workflow " + workflowInstance.getId() + " not found in service registry", e);
1268 }
1269
1270
1271 try {
1272
1273 persistence.updateInDatabase(workflowInstance);
1274
1275 job = serviceRegistry.updateJob(job);
1276
1277 WorkflowOperationInstance op = workflowInstance.getCurrentOperation();
1278
1279
1280
1281
1282 if (op == null || op.getState() != OperationState.RUNNING) {
1283
1284 long id = workflowInstance.getId();
1285 int state = workflowInstance.getState().ordinal();
1286 var template = workflowInstance.getTemplate();
1287 String mpId = workflowInstance.getMediaPackage().getIdentifier().toString();
1288 String orgId = workflowInstance.getOrganizationId();
1289
1290 updateWorkflowInstanceInIndex(id, state, template, mpId, orgId);
1291 }
1292 } catch (ServiceRegistryException e) {
1293 throw new WorkflowDatabaseException("Update of workflow job " + workflowInstance.getId()
1294 + " in the service registry failed, service registry and workflow table may be out of sync", e);
1295 } catch (NotFoundException e) {
1296 throw new WorkflowDatabaseException("Job for workflow " + workflowInstance.getId()
1297 + " not found in service registry", e);
1298 } catch (Exception e) {
1299 throw new WorkflowDatabaseException("Update of workflow job " + job.getId()
1300 + " in the service registry failed, service registry and workflow table may be out of sync", e);
1301 }
1302
1303 try {
1304 fireListeners(originalWorkflowInstance, workflowInstance);
1305 } catch (Exception e) {
1306
1307 throw new IllegalStateException("In-memory workflow instance could not be serialized", e);
1308 }
1309 } finally {
1310 lock.unlock();
1311 }
1312 }
1313
1314
1315
1316
1317
1318
1319 @Override
1320 public long countWorkflowInstances() throws WorkflowDatabaseException {
1321 return countWorkflowInstances(null);
1322 }
1323
1324
1325
1326
1327
1328
1329
1330 @Override
1331 public long countWorkflowInstances(WorkflowState state) throws WorkflowDatabaseException {
1332 return persistence.countWorkflows(state);
1333 }
1334
1335
1336
1337
1338
1339
1340 @Override
1341 public List<WorkflowInstance> getWorkflowInstancesByMediaPackage(String mediaPackageId)
1342 throws WorkflowDatabaseException, UnauthorizedException {
1343
1344 if (!assertMediaPackagePermission(mediaPackageId, Permissions.Action.READ.toString())) {
1345 throw new UnauthorizedException("Not allowed to access event");
1346 }
1347 return persistence.getWorkflowInstancesByMediaPackage(mediaPackageId);
1348 }
1349
1350
1351
1352
1353
1354
1355 @Override
1356 public Optional<WorkflowInstance> getRunningWorkflowInstanceByMediaPackage(String mediaPackageId, String action)
1357 throws WorkflowException, UnauthorizedException, WorkflowDatabaseException {
1358 List<WorkflowInstance> workflowInstances = persistence.getRunningWorkflowInstancesByMediaPackage(mediaPackageId);
1359
1360
1361 if (workflowInstances.size() > 1) {
1362 throw new WorkflowException("Multiple workflows are active on mediapackage " + mediaPackageId);
1363 }
1364
1365 Optional<WorkflowInstance> optWorkflowInstance = Optional.empty();
1366 if (workflowInstances.size() == 1) {
1367 WorkflowInstance wfInstance = workflowInstances.get(0);
1368 optWorkflowInstance = Optional.of(wfInstance);
1369 assertPermission(wfInstance, action, wfInstance.getOrganizationId());
1370 }
1371
1372 return optWorkflowInstance;
1373 }
1374
1375
1376
1377
1378
1379
1380 @Override
1381 public boolean mediaPackageHasActiveWorkflows(String mediaPackageId) throws WorkflowDatabaseException {
1382 return persistence.mediaPackageHasActiveWorkflows(mediaPackageId);
1383 }
1384
1385
1386
1387
1388
1389
1390 @Override
1391 public boolean userHasActiveWorkflows(String userId) throws WorkflowDatabaseException {
1392 return persistence.userHasActiveWorkflows(userId);
1393 }
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405 protected WorkflowInstance handleOperationException(
1406 WorkflowInstance workflow,
1407 WorkflowOperationInstance currentOperation) {
1408 int failedAttempt = currentOperation.getFailedAttempts() + 1;
1409 currentOperation.setFailedAttempts(failedAttempt);
1410
1411
1412 if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate())
1413 && OperationState.FAILED.equals(currentOperation.getState())) {
1414 int position = workflow.getOperations().indexOf(currentOperation);
1415
1416 if (workflow.getOperations().size() > position + 1) {
1417 currentOperation = workflow.getOperations().get(position + 1);
1418
1419 currentOperation.setState(OperationState.FAILED);
1420 }
1421 handleFailedOperation(workflow, currentOperation);
1422 } else if (currentOperation.getMaxAttempts() != -1 && failedAttempt == currentOperation.getMaxAttempts()) {
1423 handleFailedOperation(workflow, currentOperation);
1424 } else {
1425 switch (currentOperation.getRetryStrategy()) {
1426 case NONE:
1427 handleFailedOperation(workflow, currentOperation);
1428 break;
1429 case RETRY:
1430 currentOperation.setState(OperationState.RETRY);
1431 break;
1432 case HOLD:
1433 currentOperation.setState(OperationState.RETRY);
1434 List<WorkflowOperationInstance> operations = workflow.getOperations();
1435 WorkflowOperationDefinitionImpl errorResolutionDefinition = new WorkflowOperationDefinitionImpl(
1436 ERROR_RESOLUTION_HANDLER_ID, "Error Resolution Operation", "error", false);
1437 var errorResolutionInstance = new WorkflowOperationInstance(errorResolutionDefinition);
1438 errorResolutionInstance.setExceptionHandlingWorkflow(currentOperation.getExceptionHandlingWorkflow());
1439 var index = workflow.getOperations().indexOf(currentOperation);
1440 operations.add(index, errorResolutionInstance);
1441 workflow.setOperations(operations);
1442 break;
1443 default:
1444 break;
1445 }
1446 }
1447 return workflow;
1448 }
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458 private void handleFailedOperation(WorkflowInstance workflow, WorkflowOperationInstance currentOperation) {
1459 String errorDefId = currentOperation.getExceptionHandlingWorkflow();
1460
1461
1462 if (currentOperation.isFailOnError()) {
1463 if (StringUtils.isBlank(errorDefId)) {
1464 workflow.setState(FAILED);
1465 } else {
1466 workflow.setState(FAILING);
1467
1468
1469 int currentOperationPosition = workflow.getOperations().indexOf(currentOperation);
1470 List<WorkflowOperationInstance> operations = new ArrayList<>(
1471 workflow.getOperations().subList(0, currentOperationPosition + 1));
1472 workflow.setOperations(operations);
1473
1474
1475 Map<String, String> configuration = new HashMap<>();
1476 for (String configKey : workflow.getConfigurationKeys()) {
1477 configuration.put(configKey, workflow.getConfiguration(configKey));
1478 }
1479
1480
1481 WorkflowDefinition errorDef = null;
1482 try {
1483 errorDef = getWorkflowDefinitionById(errorDefId);
1484 workflow.extend(errorDef);
1485 workflow.setOperations(updateConfiguration(workflow, configuration).getOperations());
1486 } catch (NotFoundException notFoundException) {
1487 throw new IllegalStateException("Unable to find the error workflow definition '" + errorDefId + "'");
1488 }
1489 }
1490 }
1491
1492
1493 currentOperation.setState(OperationState.FAILED);
1494 }
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508 protected WorkflowInstance handleOperationResult(WorkflowInstance workflow, WorkflowOperationResult result)
1509 throws WorkflowDatabaseException {
1510
1511
1512 WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
1513 WorkflowOperationHandler handler = getWorkflowOperationHandler(currentOperation.getTemplate());
1514
1515
1516 if (result == null) {
1517 logger.warn("Handling a null operation result for workflow {} in operation {}", workflow.getId(),
1518 currentOperation.getTemplate());
1519 result = new WorkflowOperationResultImpl(workflow.getMediaPackage(), null, Action.CONTINUE, 0);
1520 } else {
1521 MediaPackage mp = result.getMediaPackage();
1522 if (mp != null) {
1523 workflow.setMediaPackage(mp);
1524 }
1525 }
1526
1527
1528 Action action = result.getAction();
1529
1530
1531 workflow = updateConfiguration(workflow, result.getProperties());
1532
1533
1534 currentOperation.setTimeInQueue(result.getTimeInQueue());
1535
1536
1537 switch (action) {
1538 case CONTINUE:
1539 currentOperation.setState(OperationState.SUCCEEDED);
1540 break;
1541 case PAUSE:
1542 if (!(handler instanceof ResumableWorkflowOperationHandler)) {
1543 throw new IllegalStateException("Operation " + currentOperation.getTemplate() + " is not resumable");
1544 }
1545
1546
1547 currentOperation.setContinuable(result.allowsContinue());
1548 currentOperation.setAbortable(result.allowsAbort());
1549
1550 workflow.setState(PAUSED);
1551 currentOperation.setState(OperationState.PAUSED);
1552 break;
1553 case SKIP:
1554 currentOperation.setState(OperationState.SKIPPED);
1555 break;
1556 default:
1557 throw new IllegalStateException("Unknown action '" + action + "' returned");
1558 }
1559
1560 if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate()) && result.getAction() == Action.CONTINUE) {
1561
1562 Map<String, String> resultProperties = result.getProperties();
1563 if (resultProperties == null || StringUtils.isBlank(resultProperties.get(RETRY_STRATEGY))) {
1564 throw new WorkflowDatabaseException("Retry strategy not present in properties!");
1565 }
1566
1567 RetryStrategy retryStrategy = RetryStrategy.valueOf(resultProperties.get(RETRY_STRATEGY));
1568 switch (retryStrategy) {
1569 case NONE:
1570 handleFailedOperation(workflow, workflow.getCurrentOperation());
1571 break;
1572 case RETRY:
1573 break;
1574 default:
1575 throw new WorkflowDatabaseException("Retry strategy not implemented yet!");
1576 }
1577 }
1578
1579 return workflow;
1580 }
1581
1582
1583
1584
1585
1586
1587
1588 protected void populateMediaPackageMetadata(MediaPackage mp) {
1589 if (metadataServices.isEmpty()) {
1590 logger.warn("No metadata services are registered, so no media package metadata can be extracted from catalogs");
1591 return;
1592 }
1593 for (MediaPackageMetadataService metadataService : metadataServices) {
1594 MediaPackageMetadata metadata = metadataService.getMetadata(mp);
1595 MediaPackageMetadataSupport.populateMediaPackageMetadata(mp, metadata);
1596 }
1597 }
1598
1599
1600
1601
1602
1603
1604 @Override
1605 public boolean isReadyToAcceptJobs(String operation) {
1606 return true;
1607 }
1608
1609
1610
1611
1612
1613
1614
1615
1616 @Override
1617 public boolean isReadyToAccept(Job job) throws UndispatchableJobException {
1618 String operation = job.getOperation();
1619
1620
1621 if (!Operation.START_WORKFLOW.toString().equals(operation)) {
1622 return true;
1623 }
1624
1625
1626 if (job.getArguments().size() > 1 && job.getArguments().get(0) != null) {
1627 try {
1628 WorkflowDefinition workflowDef = XmlWorkflowParser.parseWorkflowDefinition(job.getArguments().get(0));
1629 if (workflowDef.getOperations().size() > 0) {
1630 String firstOperationId = workflowDef.getOperations().get(0).getId();
1631 WorkflowOperationHandler handler = getWorkflowOperationHandler(firstOperationId);
1632 if (handler instanceof ResumableWorkflowOperationHandler) {
1633 if (((ResumableWorkflowOperationHandler) handler).isAlwaysPause()) {
1634 return true;
1635 }
1636 }
1637 }
1638 } catch (WorkflowParsingException e) {
1639 throw new UndispatchableJobException(job + " is not a proper job to start a workflow", e);
1640 }
1641 }
1642
1643 WorkflowInstance workflow;
1644 Optional<WorkflowInstance> workflowInstance;
1645 String mediaPackageId;
1646
1647
1648 try {
1649 workflow = getWorkflowById(job.getId());
1650 mediaPackageId = workflow.getMediaPackage().getIdentifier().toString();
1651 } catch (NotFoundException e) {
1652 throw new UndispatchableJobException("Trying to start workflow with job id " + job.getId()
1653 + " but no corresponding instance is available from the workflow service", e);
1654 } catch (UnauthorizedException e) {
1655 throw new UndispatchableJobException(
1656 "Authorization denied while requesting to loading workflow instance. Job: " + job.getId(), e);
1657 }
1658
1659 try {
1660 workflowInstance = getRunningWorkflowInstanceByMediaPackage(
1661 workflow.getMediaPackage().getIdentifier().toString(), Permissions.Action.READ.toString());
1662 } catch (UnauthorizedException e) {
1663 throw new UndispatchableJobException("Authorization denied while requesting to loading workflow instance "
1664 + workflow.getId(), e);
1665 } catch (WorkflowDatabaseException e) {
1666 throw new UndispatchableJobException("An database error occurred while checking if a workflow is already active "
1667 + "(job: " + job.getId() + ")", e);
1668 } catch (WorkflowException e) {
1669
1670 delayWorkflow(workflow, mediaPackageId);
1671 return false;
1672 }
1673
1674
1675 if (workflowInstance.isPresent() && workflow.getId() != workflowInstance.get().getId()) {
1676 delayWorkflow(workflow, mediaPackageId);
1677 return false;
1678 }
1679
1680 return true;
1681 }
1682
1683 private void delayWorkflow(WorkflowInstance workflow, String mediaPackageId) {
1684 if (!delayedWorkflows.contains(workflow.getId())) {
1685 logger.info("Delaying start of workflow {}, another workflow on media package {} is still running",
1686 workflow.getId(), mediaPackageId);
1687 delayedWorkflows.add(workflow.getId());
1688 }
1689 }
1690
1691
1692
1693
1694
1695
1696 @Override
1697 public synchronized void acceptJob(Job job) throws ServiceRegistryException {
1698 User originalUser = securityService.getUser();
1699 Organization originalOrg = securityService.getOrganization();
1700 try {
1701 Organization organization = organizationDirectoryService.getOrganization(job.getOrganization());
1702 securityService.setOrganization(organization);
1703 User user = userDirectoryService.loadUser(job.getCreator());
1704 securityService.setUser(user);
1705 job.setStatus(Job.Status.RUNNING);
1706 job = serviceRegistry.updateJob(job);
1707
1708
1709 if (delayedWorkflows.contains(job.getId())) {
1710 delayedWorkflows.remove(job.getId());
1711 logger.info("Starting initially delayed workflow {}, {} more waiting", job.getId(), delayedWorkflows.size());
1712 }
1713
1714 executorService.submit(new JobRunner(job, serviceRegistry.getCurrentJob()));
1715 } catch (Exception e) {
1716 if (e instanceof ServiceRegistryException) {
1717 throw (ServiceRegistryException) e;
1718 }
1719 throw new ServiceRegistryException(e);
1720 } finally {
1721 securityService.setUser(originalUser);
1722 securityService.setOrganization(originalOrg);
1723 }
1724 }
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735 protected String process(Job job) throws Exception {
1736 List<String> arguments = job.getArguments();
1737 Operation op = null;
1738 WorkflowInstance workflowInstance = null;
1739 WorkflowOperationInstance wfo;
1740 String operation = job.getOperation();
1741 try {
1742 try {
1743 op = Operation.valueOf(operation);
1744 switch (op) {
1745 case START_WORKFLOW:
1746 workflowInstance = persistence.getWorkflow(Long.parseLong(job.getPayload()));
1747 logger.debug("Starting new workflow {}", workflowInstance);
1748 runWorkflow(workflowInstance);
1749 break;
1750 case RESUME:
1751 workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1752 Map<String, String> properties = null;
1753 if (arguments.size() > 1) {
1754 Properties props = new Properties();
1755 props.load(IOUtils.toInputStream(arguments.get(arguments.size() - 1), StandardCharsets.UTF_8));
1756 properties = new HashMap<>();
1757 for (Entry<Object, Object> entry : props.entrySet()) {
1758 properties.put(entry.getKey().toString(), entry.getValue().toString());
1759 }
1760 }
1761 logger.debug("Resuming {} at {}", workflowInstance, workflowInstance.getCurrentOperation());
1762 workflowInstance.setState(RUNNING);
1763 update(workflowInstance);
1764 runWorkflowOperation(workflowInstance, properties);
1765 break;
1766 case START_OPERATION:
1767 workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1768 wfo = workflowInstance.getCurrentOperation();
1769
1770 if (OperationState.RUNNING.equals(wfo.getState()) || OperationState.PAUSED.equals(wfo.getState())) {
1771 logger.info("Reset operation state {} {} to INSTANTIATED due to job restart", workflowInstance, wfo);
1772 wfo.setState(OperationState.INSTANTIATED);
1773 }
1774
1775 wfo.setExecutionHost(job.getProcessingHost());
1776 logger.debug("Running {} {}", workflowInstance, wfo);
1777 wfo = runWorkflowOperation(workflowInstance, null);
1778 updateOperationJob(job.getId(), wfo.getState());
1779 break;
1780 default:
1781 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
1782 }
1783 } catch (IllegalArgumentException e) {
1784 throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
1785 } catch (IndexOutOfBoundsException e) {
1786 throw new ServiceRegistryException(
1787 "The argument list for operation '" + op + "' (job: " + job.getId() + ") does not meet expectations", e);
1788 } catch (NotFoundException e) {
1789 logger.warn("Not found processing job {}", job, e);
1790 updateOperationJob(job.getId(), OperationState.FAILED);
1791 }
1792 return null;
1793 } catch (Exception e) {
1794 logger.warn("Exception while accepting job {}", job, e);
1795 try {
1796 if (workflowInstance != null) {
1797 logger.warn("Marking job {} and workflow instance {} as failed", job, workflowInstance);
1798 updateOperationJob(job.getId(), OperationState.FAILED);
1799 workflowInstance.setState(FAILED);
1800 update(workflowInstance);
1801 } else {
1802 logger.warn("Unable to parse workflow instance", e);
1803 }
1804 } catch (WorkflowDatabaseException e1) {
1805 throw new ServiceRegistryException(e1);
1806 }
1807 if (e instanceof ServiceRegistryException) {
1808 throw e;
1809 }
1810 throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
1811 }
1812 }
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828 private Job updateOperationJob(Long jobId, OperationState state) throws NotFoundException, ServiceRegistryException {
1829 if (jobId == null) {
1830 return null;
1831 }
1832 Job job = serviceRegistry.getJob(jobId);
1833 switch (state) {
1834 case FAILED:
1835 case RETRY:
1836 job.setStatus(Status.FAILED);
1837 break;
1838 case PAUSED:
1839 job.setStatus(Status.PAUSED);
1840 job.setOperation(Operation.RESUME.toString());
1841 break;
1842 case SKIPPED:
1843 case SUCCEEDED:
1844 job.setStatus(Status.FINISHED);
1845 break;
1846 default:
1847 throw new IllegalStateException("Unexpected state '" + state + "' found");
1848 }
1849 return serviceRegistry.updateJob(job);
1850 }
1851
1852
1853
1854
1855
1856
1857 @Override
1858 public long countJobs(Status status) throws ServiceRegistryException {
1859 return serviceRegistry.count(JOB_TYPE, status);
1860 }
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870 private String mapToString(Map<String, String> props) {
1871 if (props == null) {
1872 return null;
1873 }
1874 StringBuilder sb = new StringBuilder();
1875 for (Entry<String, String> entry : props.entrySet()) {
1876 sb.append(entry.getKey());
1877 sb.append("=");
1878 sb.append(entry.getValue());
1879 sb.append("\n");
1880 }
1881 return sb.toString();
1882 }
1883
1884
1885
1886
1887
1888
1889
1890 @Reference(target = "(artifact=workflowdefinition)")
1891 protected void setProfilesReadyIndicator(ReadinessIndicator unused) { }
1892
1893
1894
1895
1896
1897
1898
1899 @Reference
1900 protected void setWorkspace(Workspace workspace) {
1901 this.workspace = workspace;
1902 }
1903
1904
1905
1906
1907
1908
1909
1910 @Reference
1911 protected void setServiceRegistry(ServiceRegistry registry) {
1912 this.serviceRegistry = registry;
1913 }
1914
1915 public ServiceRegistry getServiceRegistry() {
1916 return serviceRegistry;
1917 }
1918
1919
1920
1921
1922
1923
1924
1925 @Reference
1926 public void setSecurityService(SecurityService securityService) {
1927 this.securityService = securityService;
1928 }
1929
1930
1931
1932
1933
1934
1935
1936 @Reference
1937 public void setAuthorizationService(AuthorizationService authorizationService) {
1938 this.authorizationService = authorizationService;
1939 }
1940
1941
1942
1943
1944
1945
1946
1947 @Reference
1948 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1949 this.userDirectoryService = userDirectoryService;
1950 }
1951
1952
1953
1954
1955
1956
1957
1958 @Reference
1959 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
1960 this.organizationDirectoryService = organizationDirectory;
1961 }
1962
1963
1964
1965
1966
1967
1968
1969 @Reference
1970 public void setSeriesService(SeriesService seriesService) {
1971 this.seriesService = seriesService;
1972 }
1973
1974
1975
1976
1977
1978
1979
1980 @Reference
1981 public void setAssetManager(AssetManager assetManager) {
1982 this.assetManager = assetManager;
1983 }
1984
1985
1986
1987
1988
1989
1990
1991 @Reference(
1992 cardinality = ReferenceCardinality.AT_LEAST_ONE,
1993 policy = ReferencePolicy.DYNAMIC,
1994 unbind = "removeMetadataService"
1995 )
1996 protected void addMetadataService(MediaPackageMetadataService service) {
1997 metadataServices.add(service);
1998 }
1999
2000
2001
2002
2003
2004
2005
2006 protected void removeMetadataService(MediaPackageMetadataService service) {
2007 metadataServices.remove(service);
2008 }
2009
2010
2011
2012
2013
2014
2015
2016 @Reference
2017 protected void addWorkflowDefinitionScanner(WorkflowDefinitionScanner scanner) {
2018 workflowDefinitionScanner = scanner;
2019 }
2020
2021
2022
2023
2024
2025
2026
2027 @Reference
2028 public void setIndex(ElasticsearchIndex index) {
2029 this.index = index;
2030 }
2031
2032
2033
2034
2035
2036
2037
2038 @Reference(name = "workflow-persistence")
2039 public void setPersistence(WorkflowServiceDatabase persistence) {
2040 this.persistence = persistence;
2041 }
2042
2043
2044
2045
2046
2047
2048 @Override
2049 public String getJobType() {
2050 return JOB_TYPE;
2051 }
2052
2053
2054
2055
2056 public static class HandlerRegistration {
2057
2058 protected WorkflowOperationHandler handler;
2059 protected String operationName;
2060
2061 public HandlerRegistration(String operationName, WorkflowOperationHandler handler) {
2062 if (operationName == null) {
2063 throw new IllegalArgumentException("Operation name cannot be null");
2064 }
2065 if (handler == null) {
2066 throw new IllegalArgumentException("Handler cannot be null");
2067 }
2068 this.operationName = operationName;
2069 this.handler = handler;
2070 }
2071
2072 public WorkflowOperationHandler getHandler() {
2073 return handler;
2074 }
2075
2076
2077
2078
2079
2080
2081 @Override
2082 public int hashCode() {
2083 final int prime = 31;
2084 int result = 1;
2085 result = prime * result + handler.hashCode();
2086 result = prime * result + operationName.hashCode();
2087 return result;
2088 }
2089
2090
2091
2092
2093
2094
2095 @Override
2096 public boolean equals(Object obj) {
2097 if (this == obj) {
2098 return true;
2099 }
2100 if (obj == null) {
2101 return false;
2102 }
2103 if (getClass() != obj.getClass()) {
2104 return false;
2105 }
2106 HandlerRegistration other = (HandlerRegistration) obj;
2107 if (!handler.equals(other.handler)) {
2108 return false;
2109 }
2110 return operationName.equals(other.operationName);
2111 }
2112 }
2113
2114
2115
2116
2117 class JobRunner implements Callable<Void> {
2118
2119
2120 private Job job = null;
2121
2122
2123 private final Job currentJob;
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133 JobRunner(Job job, Job currentJob) {
2134 this.job = job;
2135 this.currentJob = currentJob;
2136 }
2137
2138
2139
2140
2141
2142
2143 @Override
2144 public Void call() throws Exception {
2145 Organization jobOrganization = organizationDirectoryService.getOrganization(job.getOrganization());
2146 try {
2147 serviceRegistry.setCurrentJob(currentJob);
2148 securityService.setOrganization(jobOrganization);
2149 User jobUser = userDirectoryService.loadUser(job.getCreator());
2150 securityService.setUser(jobUser);
2151 process(job);
2152 } finally {
2153 serviceRegistry.setCurrentJob(null);
2154 securityService.setUser(null);
2155 securityService.setOrganization(null);
2156 }
2157 return null;
2158 }
2159 }
2160
2161 @Override
2162 public synchronized void cleanupWorkflowInstances(int buffer, WorkflowState state) throws UnauthorizedException,
2163 WorkflowDatabaseException {
2164 logger.info("Start cleaning up workflow instances older than {} days with status '{}'", buffer, state);
2165
2166 int instancesCleaned = 0;
2167 int cleaningFailed = 0;
2168
2169 Date priorTo = DateUtils.addDays(new Date(), -buffer);
2170
2171 try {
2172 for (WorkflowInstance workflowInstance : persistence.getWorkflowInstancesForCleanup(state, priorTo)) {
2173 try {
2174 logger.debug("Deleting workflow instance {}", workflowInstance.getId());
2175 remove(workflowInstance.getId());
2176 instancesCleaned++;
2177 } catch (WorkflowDatabaseException | UnauthorizedException e) {
2178 throw e;
2179 } catch (NotFoundException e) {
2180
2181 logger.debug("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2182 } catch (WorkflowParsingException | WorkflowStateException e) {
2183 logger.warn("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2184 cleaningFailed++;
2185 }
2186 }
2187 } catch (WorkflowDatabaseException e) {
2188 throw new WorkflowDatabaseException(e);
2189 }
2190
2191 if (instancesCleaned == 0 && cleaningFailed == 0) {
2192 logger.info("No workflow instances found to clean up");
2193 return;
2194 }
2195
2196 if (instancesCleaned > 0) {
2197 logger.info("Cleaned up '{}' workflow instances", instancesCleaned);
2198 }
2199 if (cleaningFailed > 0) {
2200 logger.warn("Cleaning failed for '{}' workflow instances", cleaningFailed);
2201 throw new WorkflowDatabaseException("Unable to clean all workflow instances, see logs!");
2202 }
2203 }
2204
2205 @Override
2206 public Map<String, Map<String, String>> getWorkflowStateMappings() {
2207 return workflowDefinitionScanner.workflowStateMappings.entrySet().stream().collect(Collectors.toMap(
2208 Entry::getKey, e -> e.getValue().stream()
2209 .collect(Collectors.toMap(m -> m.getState().name(), WorkflowStateMapping::getValue))
2210 ));
2211 }
2212
2213 @Override
2214 public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
2215 try {
2216 final int total;
2217 try {
2218 total = persistence.countMediaPackages();
2219 } catch (WorkflowDatabaseException e) {
2220 logIndexRebuildError(logger, e);
2221 throw new IndexRebuildException(getService(), e);
2222 }
2223
2224 if (total > 0) {
2225 logIndexRebuildBegin(logger, total, "workflows");
2226 int current = 0;
2227 int n = 20;
2228 List<WorkflowIndexData> workflowIndexData;
2229
2230 int limit = 1000;
2231 int offset = 0;
2232 String currentMediapackageId;
2233 String lastMediapackageId = "";
2234 var updatedWorkflowRange = new ArrayList<Event>();
2235 do {
2236 try {
2237 workflowIndexData = persistence.getWorkflowIndexData(limit, offset);
2238 } catch (WorkflowDatabaseException e) {
2239 logIndexRebuildError(logger, e);
2240 throw new IndexRebuildException(getService(), e);
2241 }
2242 if (!workflowIndexData.isEmpty()) {
2243 offset += limit;
2244 logger.debug("Got {} workflows for re-indexing", workflowIndexData.size());
2245
2246 for (WorkflowIndexData indexData : workflowIndexData) {
2247 currentMediapackageId = indexData.getMediaPackageId();
2248 if (currentMediapackageId.equals(lastMediapackageId)) {
2249 continue;
2250 }
2251 current++;
2252
2253
2254 if (!WorkflowUtil.isActive(WorkflowInstance.WorkflowState.values()[indexData.getState()].toString())
2255 || WorkflowState.PAUSED == WorkflowInstance.WorkflowState.values()[indexData.getState()]) {
2256 String orgid = indexData.getOrganizationId();
2257 if (null == orgid) {
2258 String mpId = indexData.getMediaPackageId();
2259
2260 List<Snapshot> snapshots = assetManager.getSnapshotsById(mpId);
2261 if (snapshots.size() == 0) {
2262 logger.debug("Dropping {} from the index since it is missing from the database", mpId);
2263 continue;
2264 }
2265 orgid = snapshots.stream().findFirst().get().getOrganizationId();
2266
2267
2268 try {
2269
2270
2271 WorkflowInstance instance = persistence.getWorkflow(indexData.getId(), null);
2272 instance.setOrganizationId(orgid);
2273 persistence.updateInDatabase(instance);
2274 } catch (NotFoundException e) {
2275
2276 }
2277 }
2278 var updatedWorkflowData = index.getEvent(indexData.getMediaPackageId(), orgid,
2279 securityService.getUser());
2280 updatedWorkflowData = getStateUpdateFunction(indexData.getId(),indexData.getState(),
2281 indexData.getMediaPackageId(), indexData.getTemplate(), indexData.getOrganizationId())
2282 .apply(updatedWorkflowData);
2283 updatedWorkflowRange.add(updatedWorkflowData.get());
2284
2285 if (updatedWorkflowRange.size() >= n || current >= total) {
2286 index.bulkEventUpdate(updatedWorkflowRange);
2287 logIndexRebuildProgress(logger, total, current, n);
2288 updatedWorkflowRange.clear();
2289 }
2290 }
2291 else {
2292 logger.info("Skipping. Workflow {} is currently active.", indexData.getId());
2293 }
2294
2295 lastMediapackageId = currentMediapackageId;
2296 }
2297 }
2298 } while (!workflowIndexData.isEmpty());
2299 }
2300 } catch (Exception e) {
2301 logIndexRebuildError(logger, e);
2302 throw new IndexRebuildException(getService(), e);
2303 }
2304 }
2305
2306 @Override
2307 public IndexRebuildService.Service getService() {
2308 return IndexRebuildService.Service.Workflow;
2309 }
2310
2311
2312
2313
2314
2315
2316
2317 private void removeWorkflowInstanceFromIndex(long workflowInstanceId) {
2318 final String orgId = securityService.getOrganization().getId();
2319 final User user = securityService.getUser();
2320
2321
2322 SearchResult<Event> results;
2323 try {
2324 results = index.getByQuery(new EventSearchQuery(orgId, user).withWorkflowId(workflowInstanceId));
2325 } catch (SearchIndexException e) {
2326 logger.error("Error retrieving the events for workflow instance {} from the {} index.", workflowInstanceId,
2327 index.getIndexName(), e);
2328 return;
2329 }
2330
2331 if (results.getItems().length == 0) {
2332 logger.warn("No events for workflow instance {} found in the {} index.", workflowInstanceId,
2333 index.getIndexName());
2334 return;
2335 }
2336
2337
2338 for (SearchResultItem<Event> item: results.getItems()) {
2339 String eventId = item.getSource().getIdentifier();
2340 logger.debug("Removing workflow instance {} of event {} from the {} index.", workflowInstanceId, eventId,
2341 index.getIndexName());
2342
2343 Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
2344 if (!eventOpt.isPresent()) {
2345 logger.warn("Event {} of workflow instance {} not found in the {} index.", workflowInstanceId,
2346 eventId, index.getIndexName());
2347 return Optional.empty();
2348 }
2349 Event event = eventOpt.get();
2350 if (event.getWorkflowId() != null && event.getWorkflowId().equals(workflowInstanceId)) {
2351 logger.debug("Workflow {} is the current workflow of event {}. Removing it from event.", eventId,
2352 workflowInstanceId);
2353 event.setWorkflowId(null);
2354 event.setWorkflowDefinitionId(null);
2355 event.setWorkflowState(null);
2356 return Optional.of(event);
2357 }
2358 return Optional.empty();
2359 };
2360
2361 try {
2362 index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
2363 logger.debug("Workflow instance {} of event {} removed from the {} index.", workflowInstanceId, eventId,
2364 index.getIndexName());
2365 } catch (SearchIndexException e) {
2366 logger.error("Error removing the workflow instance {} of event {} from the {} index.", workflowInstanceId,
2367 eventId, index.getIndexName(), e);
2368 }
2369 }
2370 }
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384 private void updateWorkflowInstanceInIndex(long id, int state, String wfDefId, String mpId, String orgId) {
2385 final User user = securityService.getUser();
2386
2387 logger.debug("Updating workflow instance {} of event {} in the {} index.", id, mpId,
2388 index.getIndexName());
2389 Function<Optional<Event>, Optional<Event>> updateFunction = getStateUpdateFunction(id, state, wfDefId, mpId, orgId);
2390
2391 try {
2392 index.addOrUpdateEvent(mpId, updateFunction, orgId, user);
2393 logger.debug("Workflow instance {} of event {} updated in the {} index.", id, mpId,
2394 index.getIndexName());
2395 } catch (SearchIndexException e) {
2396 logger.error("Error updating the workflow instance {} of event {} in the {} index.", id, mpId,
2397 index.getIndexName(), e);
2398 }
2399 }
2400
2401
2402
2403
2404
2405
2406 private Function<Optional<Event>, Optional<Event>> getStateUpdateFunction(long workflowId,
2407 int workflowState, String workflowDefinitionId, String mediaPackageId,
2408 String orgId) {
2409 return (Optional<Event> eventOpt) -> {
2410 Event event = eventOpt.orElse(new Event(mediaPackageId, orgId));
2411 event.setWorkflowId(workflowId);
2412 event.setWorkflowState(WorkflowState.values()[workflowState]);
2413 event.setWorkflowDefinitionId(workflowDefinitionId);
2414 return Optional.of(event);
2415 };
2416 }
2417 }