1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.impl;
23
24 import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
25 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILED;
26 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.FAILING;
27 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.INSTANTIATED;
28 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.PAUSED;
29 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.RUNNING;
30 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.STOPPED;
31 import static org.opencastproject.workflow.api.WorkflowInstance.WorkflowState.SUCCEEDED;
32
33 import org.opencastproject.assetmanager.api.AssetManager;
34 import org.opencastproject.assetmanager.api.query.RichAResult;
35 import org.opencastproject.assetmanager.util.WorkflowPropertiesUtil;
36 import org.opencastproject.elasticsearch.api.SearchIndexException;
37 import org.opencastproject.elasticsearch.api.SearchResult;
38 import org.opencastproject.elasticsearch.api.SearchResultItem;
39 import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
40 import org.opencastproject.elasticsearch.index.objects.event.Event;
41 import org.opencastproject.elasticsearch.index.objects.event.EventSearchQuery;
42 import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
43 import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
44 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
45 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
46 import org.opencastproject.job.api.Job;
47 import org.opencastproject.job.api.Job.Status;
48 import org.opencastproject.job.api.JobProducer;
49 import org.opencastproject.mediapackage.MediaPackage;
50 import org.opencastproject.mediapackage.MediaPackageElement;
51 import org.opencastproject.mediapackage.MediaPackageParser;
52 import org.opencastproject.mediapackage.MediaPackageSupport;
53 import org.opencastproject.mediapackage.Publication;
54 import org.opencastproject.metadata.api.MediaPackageMetadata;
55 import org.opencastproject.metadata.api.MediaPackageMetadataService;
56 import org.opencastproject.metadata.api.MetadataService;
57 import org.opencastproject.metadata.api.util.MediaPackageMetadataSupport;
58 import org.opencastproject.security.api.AccessControlList;
59 import org.opencastproject.security.api.AccessControlUtil;
60 import org.opencastproject.security.api.AclScope;
61 import org.opencastproject.security.api.AuthorizationService;
62 import org.opencastproject.security.api.Organization;
63 import org.opencastproject.security.api.OrganizationDirectoryService;
64 import org.opencastproject.security.api.Permissions;
65 import org.opencastproject.security.api.SecurityService;
66 import org.opencastproject.security.api.UnauthorizedException;
67 import org.opencastproject.security.api.User;
68 import org.opencastproject.security.api.UserDirectoryService;
69 import org.opencastproject.series.api.SeriesException;
70 import org.opencastproject.series.api.SeriesService;
71 import org.opencastproject.serviceregistry.api.ServiceRegistry;
72 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
73 import org.opencastproject.serviceregistry.api.UndispatchableJobException;
74 import org.opencastproject.util.NotFoundException;
75 import org.opencastproject.util.ReadinessIndicator;
76 import org.opencastproject.util.data.Tuple;
77 import org.opencastproject.workflow.api.ResumableWorkflowOperationHandler;
78 import org.opencastproject.workflow.api.RetryStrategy;
79 import org.opencastproject.workflow.api.WorkflowDatabaseException;
80 import org.opencastproject.workflow.api.WorkflowDefinition;
81 import org.opencastproject.workflow.api.WorkflowException;
82 import org.opencastproject.workflow.api.WorkflowIdentifier;
83 import org.opencastproject.workflow.api.WorkflowIndexData;
84 import org.opencastproject.workflow.api.WorkflowInstance;
85 import org.opencastproject.workflow.api.WorkflowInstance.WorkflowState;
86 import org.opencastproject.workflow.api.WorkflowListener;
87 import org.opencastproject.workflow.api.WorkflowOperationDefinition;
88 import org.opencastproject.workflow.api.WorkflowOperationDefinitionImpl;
89 import org.opencastproject.workflow.api.WorkflowOperationHandler;
90 import org.opencastproject.workflow.api.WorkflowOperationInstance;
91 import org.opencastproject.workflow.api.WorkflowOperationInstance.OperationState;
92 import org.opencastproject.workflow.api.WorkflowOperationResult;
93 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
94 import org.opencastproject.workflow.api.WorkflowOperationResultImpl;
95 import org.opencastproject.workflow.api.WorkflowParsingException;
96 import org.opencastproject.workflow.api.WorkflowService;
97 import org.opencastproject.workflow.api.WorkflowServiceDatabase;
98 import org.opencastproject.workflow.api.WorkflowStateException;
99 import org.opencastproject.workflow.api.WorkflowStateMapping;
100 import org.opencastproject.workflow.api.WorkflowUtil;
101 import org.opencastproject.workflow.api.XmlWorkflowParser;
102 import org.opencastproject.workspace.api.Workspace;
103
104 import com.google.common.util.concurrent.Striped;
105
106 import org.apache.commons.io.IOUtils;
107 import org.apache.commons.lang3.StringUtils;
108 import org.apache.commons.lang3.time.DateUtils;
109 import org.osgi.framework.InvalidSyntaxException;
110 import org.osgi.framework.ServiceReference;
111 import org.osgi.service.component.ComponentContext;
112 import org.osgi.service.component.annotations.Activate;
113 import org.osgi.service.component.annotations.Component;
114 import org.osgi.service.component.annotations.Reference;
115 import org.osgi.service.component.annotations.ReferenceCardinality;
116 import org.osgi.service.component.annotations.ReferencePolicy;
117 import org.slf4j.Logger;
118 import org.slf4j.LoggerFactory;
119
120 import java.io.ByteArrayOutputStream;
121 import java.io.IOException;
122 import java.nio.charset.StandardCharsets;
123 import java.util.ArrayList;
124 import java.util.Collections;
125 import java.util.Comparator;
126 import java.util.Date;
127 import java.util.HashMap;
128 import java.util.HashSet;
129 import java.util.List;
130 import java.util.Map;
131 import java.util.Map.Entry;
132 import java.util.Optional;
133 import java.util.Properties;
134 import java.util.Set;
135 import java.util.SortedSet;
136 import java.util.TreeSet;
137 import java.util.concurrent.Callable;
138 import java.util.concurrent.CopyOnWriteArrayList;
139 import java.util.concurrent.Executors;
140 import java.util.concurrent.ThreadPoolExecutor;
141 import java.util.concurrent.locks.Lock;
142 import java.util.function.Function;
143 import java.util.stream.Collectors;
144
145
146
147
148
149
150
151
152 @Component(
153 property = {
154 "service.description=Workflow Service",
155 "service.pid=org.opencastproject.workflow.impl.WorkflowServiceImpl"
156 },
157 immediate = true,
158 service = { WorkflowService.class, WorkflowServiceImpl.class, IndexProducer.class }
159 )
160 public class WorkflowServiceImpl extends AbstractIndexProducer implements WorkflowService, JobProducer {
161
162
163 private static final String RETRY_STRATEGY = "retryStrategy";
164
165
166 private static final Logger logger = LoggerFactory.getLogger(WorkflowServiceImpl.class);
167
168
169 enum Operation {
170 START_WORKFLOW, RESUME, START_OPERATION
171 }
172
173
174 private static final String NULL_PARENT_ID = "-";
175
176
177
178
179
180 private static final float WORKFLOW_JOB_LOAD = 0.0f;
181
182
183 public static final String ERROR_RESOLUTION_HANDLER_ID = "error-resolution";
184
185
186 protected ComponentContext componentContext = null;
187
188
189 private SortedSet<MediaPackageMetadataService> metadataServices;
190
191
192 protected WorkflowServiceDatabase persistence;
193
194
195 private final List<WorkflowListener> listeners = new CopyOnWriteArrayList<WorkflowListener>();
196
197
198 protected ThreadPoolExecutor executorService;
199
200
201 protected Workspace workspace = null;
202
203
204 protected ServiceRegistry serviceRegistry = null;
205
206
207 protected SecurityService securityService = null;
208
209
210 protected AuthorizationService authorizationService = null;
211
212
213 protected UserDirectoryService userDirectoryService = null;
214
215
216 protected OrganizationDirectoryService organizationDirectoryService = null;
217
218
219 protected SeriesService seriesService;
220
221
222 protected AssetManager assetManager = null;
223
224
225 private WorkflowDefinitionScanner workflowDefinitionScanner;
226
227
228 private final List<Long> delayedWorkflows = new ArrayList<Long>();
229
230
231 private final Striped<Lock> lock = Striped.lazyWeakLock(1024);
232 private final Striped<Lock> updateLock = Striped.lazyWeakLock(1024);
233 private final Striped<Lock> mediaPackageLocks = Striped.lazyWeakLock(1024);
234
235
236 private ElasticsearchIndex index;
237
238
239
240
241 public WorkflowServiceImpl() {
242 metadataServices = new TreeSet<>(Comparator.comparingInt(MetadataService::getPriority));
243 }
244
245
246
247
248
249
250
251 @Activate
252 public void activate(ComponentContext componentContext) {
253 this.componentContext = componentContext;
254 executorService = (ThreadPoolExecutor) Executors.newCachedThreadPool();
255 logger.info("Activate Workflow service");
256 }
257
258
259
260
261
262
263
264 @Override
265 public void addWorkflowListener(WorkflowListener listener) {
266 listeners.add(listener);
267 }
268
269
270
271
272
273
274
275 @Override
276 public void removeWorkflowListener(WorkflowListener listener) {
277 listeners.remove(listener);
278 }
279
280
281
282
283 protected void fireListeners(final WorkflowInstance oldWorkflowInstance, final WorkflowInstance newWorkflowInstance) {
284 final User currentUser = securityService.getUser();
285 final Organization currentOrganization = securityService.getOrganization();
286 for (final WorkflowListener listener : listeners) {
287 if (oldWorkflowInstance == null || !oldWorkflowInstance.getState().equals(newWorkflowInstance.getState())) {
288 Runnable runnable = () -> {
289 try {
290 securityService.setUser(currentUser);
291 securityService.setOrganization(currentOrganization);
292 listener.stateChanged(newWorkflowInstance);
293 } finally {
294 securityService.setUser(null);
295 securityService.setOrganization(null);
296 }
297 };
298 executorService.execute(runnable);
299 } else {
300 logger.debug("Not notifying {} because the workflow state has not changed", listener);
301 }
302
303 if (newWorkflowInstance.getCurrentOperation() != null) {
304 if (oldWorkflowInstance == null || oldWorkflowInstance.getCurrentOperation() == null
305 || !oldWorkflowInstance.getCurrentOperation().equals(newWorkflowInstance.getCurrentOperation())) {
306 Runnable runnable = () -> {
307 try {
308 securityService.setUser(currentUser);
309 securityService.setOrganization(currentOrganization);
310 listener.operationChanged(newWorkflowInstance);
311 } finally {
312 securityService.setUser(null);
313 securityService.setOrganization(null);
314 }
315 };
316 executorService.execute(runnable);
317 }
318 } else {
319 logger.debug("Not notifying {} because the workflow operation has not changed", listener);
320 }
321 }
322 }
323
324
325
326
327
328
329 @Override
330 public List<WorkflowDefinition> listAvailableWorkflowDefinitions() {
331 return workflowDefinitionScanner
332 .getAvailableWorkflowDefinitions(securityService.getOrganization(), securityService.getUser())
333 .sorted()
334 .collect(Collectors.toList());
335 }
336
337
338
339
340 public boolean isRunnable(WorkflowDefinition workflowDefinition) {
341 List<String> availableOperations = listAvailableOperationNames();
342 List<WorkflowDefinition> checkedWorkflows = new ArrayList<>();
343 boolean runnable = isRunnable(workflowDefinition, availableOperations, checkedWorkflows);
344 int wfCount = checkedWorkflows.size() - 1;
345 if (runnable) {
346 logger.info("Workflow {}, containing {} derived workflows, is runnable", workflowDefinition, wfCount);
347 } else {
348 logger.warn("Workflow {}, containing {} derived workflows, is not runnable", workflowDefinition, wfCount);
349 }
350 return runnable;
351 }
352
353
354
355
356
357
358
359
360
361
362
363
364
365 private boolean isRunnable(WorkflowDefinition workflowDefinition, List<String> availableOperations,
366 List<WorkflowDefinition> checkedWorkflows) {
367 if (checkedWorkflows.contains(workflowDefinition)) {
368 return true;
369 }
370
371
372 for (WorkflowOperationDefinition op : workflowDefinition.getOperations()) {
373 if (!availableOperations.contains(op.getId())) {
374 logger.info("{} is not runnable due to missing operation {}", workflowDefinition, op);
375 return false;
376 }
377 String catchWorkflow = op.getExceptionHandlingWorkflow();
378 if (catchWorkflow != null) {
379 WorkflowDefinition catchWorkflowDefinition;
380 try {
381 catchWorkflowDefinition = getWorkflowDefinitionById(catchWorkflow);
382 } catch (NotFoundException e) {
383 logger.info("{} is not runnable due to missing catch workflow {} on operation {}", workflowDefinition,
384 catchWorkflow, op);
385 return false;
386 }
387 if (!isRunnable(catchWorkflowDefinition, availableOperations, checkedWorkflows)) {
388 return false;
389 }
390 }
391 }
392
393
394 if (!checkedWorkflows.contains(workflowDefinition)) {
395 checkedWorkflows.add(workflowDefinition);
396 }
397 return true;
398 }
399
400
401
402
403
404
405 public Set<HandlerRegistration> getRegisteredHandlers() {
406 Set<HandlerRegistration> set = new HashSet<>();
407 ServiceReference[] refs;
408 try {
409 refs = componentContext.getBundleContext().getServiceReferences(WorkflowOperationHandler.class.getName(), null);
410 } catch (InvalidSyntaxException e) {
411 throw new IllegalStateException(e);
412 }
413 if (refs != null) {
414 for (ServiceReference ref : refs) {
415 WorkflowOperationHandler handler = (WorkflowOperationHandler) componentContext.getBundleContext().getService(
416 ref);
417 set.add(new HandlerRegistration((String) ref.getProperty(WORKFLOW_OPERATION_PROPERTY), handler));
418 }
419 } else {
420 logger.warn("No registered workflow operation handlers found");
421 }
422 return set;
423 }
424
425 protected WorkflowOperationHandler getWorkflowOperationHandler(String operationId) {
426 for (HandlerRegistration reg : getRegisteredHandlers()) {
427 if (reg.operationName.equals(operationId)) {
428 return reg.handler;
429 }
430 }
431 return null;
432 }
433
434
435
436
437
438
439
440 protected List<String> listAvailableOperationNames() {
441 return getRegisteredHandlers().parallelStream().map(op -> op.operationName).collect(Collectors.toList());
442 }
443
444
445
446
447
448
449 @Override
450 public WorkflowInstance getWorkflowById(long id) throws NotFoundException,
451 UnauthorizedException {
452 try {
453 WorkflowInstance workflow = persistence.getWorkflow(id);
454 assertPermission(workflow, Permissions.Action.READ.toString(), workflow.getOrganizationId());
455 return workflow;
456 } catch (WorkflowDatabaseException e) {
457 throw new IllegalStateException("Got not get workflow from database with id ");
458 }
459 }
460
461
462
463
464
465
466
467 @Override
468 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage)
469 throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
470 return start(workflowDefinition, mediaPackage, new HashMap<>());
471 }
472
473
474
475
476
477
478
479 @Override
480 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage,
481 Map<String, String> properties)
482 throws WorkflowDatabaseException, UnauthorizedException, WorkflowParsingException {
483 try {
484 return start(workflowDefinition, mediaPackage, null, properties);
485 } catch (NotFoundException e) {
486
487 throw new IllegalStateException("a null workflow ID caused a NotFoundException. This is a programming error.");
488 }
489 }
490
491
492
493
494
495
496
497 @Override
498 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage sourceMediaPackage,
499 Long parentWorkflowId, Map<String, String> originalProperties) throws WorkflowDatabaseException,
500 NotFoundException, UnauthorizedException, WorkflowParsingException, IllegalStateException {
501 final String mediaPackageId = sourceMediaPackage.getIdentifier().toString();
502 Map<String, String> properties = null;
503
504
505
506 populateMediaPackageMetadata(sourceMediaPackage);
507
508 if (originalProperties != null) {
509 WorkflowPropertiesUtil.storeProperties(assetManager, sourceMediaPackage, originalProperties);
510 properties = WorkflowPropertiesUtil.getLatestWorkflowProperties(assetManager, mediaPackageId);
511 }
512
513
514 final Lock lock = mediaPackageLocks.get(mediaPackageId);
515 lock.lock();
516
517 try {
518 if (workflowDefinition == null) {
519 throw new IllegalArgumentException("workflow definition must not be null");
520 }
521 for (List<String> errors : MediaPackageSupport.sanityCheck(sourceMediaPackage)) {
522 throw new IllegalArgumentException("Insane media package cannot be processed: " + String.join("; ", errors));
523 }
524 if (parentWorkflowId != null) {
525 try {
526 getWorkflowById(parentWorkflowId);
527 } catch (UnauthorizedException e) {
528 throw new IllegalArgumentException("Parent workflow " + parentWorkflowId + " not visible to this user");
529 }
530 } else {
531 if (persistence.mediaPackageHasActiveWorkflows(mediaPackageId)) {
532 throw new IllegalStateException(String.format(
533 "Can't start workflow '%s' for media package '%s' because another workflow is currently active.",
534 workflowDefinition.getTitle(),
535 sourceMediaPackage.getIdentifier().toString()));
536 }
537 }
538
539
540 User currentUser = securityService.getUser();
541 validUserOrThrow(currentUser);
542
543
544 Organization organization = securityService.getOrganization();
545 if (organization == null) {
546 throw new SecurityException("Current organization is unknown");
547 }
548
549 WorkflowInstance workflowInstance = new WorkflowInstance(workflowDefinition, sourceMediaPackage,
550 currentUser, organization, properties);
551 workflowInstance = updateConfiguration(workflowInstance, properties);
552
553
554 try {
555
556 String workflowDefinitionXml = XmlWorkflowParser.toXml(workflowDefinition);
557 String mediaPackageXml = MediaPackageParser.getAsXml(sourceMediaPackage);
558
559 List<String> arguments = new ArrayList<>();
560 arguments.add(workflowDefinitionXml);
561 arguments.add(mediaPackageXml);
562 if (parentWorkflowId != null || properties != null) {
563 String parentWorkflowIdString = (parentWorkflowId != null) ? parentWorkflowId.toString() : NULL_PARENT_ID;
564 arguments.add(parentWorkflowIdString);
565 }
566 if (properties != null) {
567 arguments.add(mapToString(properties));
568 }
569
570 Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_WORKFLOW.toString(), arguments,
571 null, false, null, WORKFLOW_JOB_LOAD);
572
573
574 workflowInstance.setId(job.getId());
575
576
577
578 update(workflowInstance);
579
580 return workflowInstance;
581 } catch (Throwable t) {
582 try {
583 workflowInstance.setState(FAILED);
584 update(workflowInstance);
585 } catch (Exception failureToFail) {
586 logger.warn("Unable to update workflow to failed state", failureToFail);
587 }
588 try {
589 throw t;
590 } catch (ServiceRegistryException e) {
591 throw new WorkflowDatabaseException(e);
592 }
593 }
594 } finally {
595 lock.unlock();
596 }
597 }
598
599 protected WorkflowInstance updateConfiguration(WorkflowInstance instance, Map<String, String> properties) {
600 if (properties != null) {
601 for (Entry<String, String> entry : properties.entrySet()) {
602 instance.setConfiguration(entry.getKey(), entry.getValue());
603 }
604 }
605 return instance;
606 }
607
608
609
610
611
612
613
614
615 protected WorkflowOperationHandler selectOperationHandler(WorkflowOperationInstance operation) {
616 List<WorkflowOperationHandler> handlerList = new ArrayList<>();
617 for (HandlerRegistration handlerReg : getRegisteredHandlers()) {
618 if (handlerReg.operationName != null && handlerReg.operationName.equals(operation.getTemplate())) {
619 handlerList.add(handlerReg.handler);
620 }
621 }
622 if (handlerList.size() > 1) {
623 throw new IllegalStateException("Multiple operation handlers found for operation '" + operation.getTemplate()
624 + "'");
625 } else if (handlerList.size() == 1) {
626 return handlerList.get(0);
627 }
628 logger.warn("No workflow operation handlers found for operation '{}'", operation.getTemplate());
629 return null;
630 }
631
632
633
634
635
636
637
638
639
640
641 protected Job runWorkflow(WorkflowInstance workflow) throws WorkflowException, UnauthorizedException {
642 if (INSTANTIATED != workflow.getState()) {
643
644
645
646
647 if (RUNNING == workflow.getState()) {
648 WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
649 if (currentOperation != null) {
650 if (currentOperation.getId() != null) {
651 try {
652 Job operationJob = serviceRegistry.getJob(currentOperation.getId());
653 if (Job.Status.RUNNING.equals(operationJob.getStatus())) {
654 logger.debug("Not starting workflow {}, it is already in running state", workflow);
655 return null;
656 } else {
657 logger.info("Scheduling next operation of workflow {}", workflow);
658 operationJob.setStatus(Status.QUEUED);
659 operationJob.setDispatchable(true);
660 return serviceRegistry.updateJob(operationJob);
661 }
662 } catch (Exception e) {
663 logger.warn("Error determining status of current workflow operation in {}", workflow, e);
664 return null;
665 }
666 }
667 } else {
668 throw new IllegalStateException("Cannot start a workflow '" + workflow + "' with no current operation");
669 }
670 } else {
671 throw new IllegalStateException("Cannot start a workflow in state '" + workflow.getState() + "'");
672 }
673 }
674
675
676 workflow.setState(RUNNING);
677 update(workflow);
678
679 WorkflowOperationInstance operation = workflow.getCurrentOperation();
680
681 if (operation == null) {
682 throw new IllegalStateException("Cannot start a workflow without a current operation");
683 }
684
685 if (!operation.equals(workflow.getOperations().get(0))) {
686 throw new IllegalStateException("Current operation expected to be first");
687 }
688
689 try {
690 logger.info("Scheduling workflow {} for execution", workflow.getId());
691 Job job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
692 Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
693 operation.setId(job.getId());
694 update(workflow);
695 job.setStatus(Status.QUEUED);
696 job.setDispatchable(true);
697 return serviceRegistry.updateJob(job);
698 } catch (ServiceRegistryException e) {
699 throw new WorkflowDatabaseException(e);
700 } catch (NotFoundException e) {
701
702 throw new IllegalStateException("Unable to find a job that was just created");
703 }
704
705 }
706
707
708
709
710
711
712
713
714
715
716
717
718
719 protected WorkflowOperationInstance runWorkflowOperation(WorkflowInstance workflow, Map<String, String> properties)
720 throws WorkflowException, UnauthorizedException {
721 WorkflowOperationInstance processingOperation = workflow.getCurrentOperation();
722 if (processingOperation == null) {
723 throw new IllegalStateException("Workflow '" + workflow + "' has no operation to run");
724 }
725
726
727 WorkflowState initialState = workflow.getState();
728
729
730 WorkflowOperationHandler operationHandler = selectOperationHandler(processingOperation);
731 WorkflowOperationWorker worker = new WorkflowOperationWorker(operationHandler, workflow, properties, this);
732 workflow = worker.execute();
733
734 Long currentOperationJobId = processingOperation.getId();
735 try {
736 updateOperationJob(currentOperationJobId, processingOperation.getState());
737 } catch (NotFoundException e) {
738 throw new IllegalStateException("Unable to find a job that has already been running");
739 } catch (ServiceRegistryException e) {
740 throw new WorkflowDatabaseException(e);
741 }
742
743
744 WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
745
746
747 if (currentOperation == null) {
748
749
750 if (FAILING.equals(workflow.getState())) {
751 workflow.setState(FAILED);
752 }
753
754
755
756 else if (!FAILED.equals(workflow.getState())) {
757 workflow.setState(SUCCEEDED);
758 for (WorkflowOperationInstance op : workflow.getOperations()) {
759 if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
760 if (op.isFailOnError()) {
761 workflow.setState(FAILED);
762 break;
763 }
764 }
765 }
766 }
767
768
769 logger.debug("{} has {}", workflow, workflow.getState());
770 update(workflow);
771
772 } else {
773
774
775 WorkflowState dbWorkflowState;
776 try {
777 dbWorkflowState = getWorkflowById(workflow.getId()).getState();
778 } catch (NotFoundException e) {
779 throw new IllegalStateException("The workflow with ID " + workflow.getId()
780 + " can not be found in the database", e);
781 } catch (UnauthorizedException e) {
782 throw new IllegalStateException("The workflow with ID " + workflow.getId() + " can not be read", e);
783 }
784
785
786 if (!dbWorkflowState.equals(initialState)) {
787 logger.info("Workflow state for {} was changed to '{}' from the outside", workflow, dbWorkflowState);
788 workflow.setState(dbWorkflowState);
789 }
790
791
792
793 Job job;
794 switch (workflow.getState()) {
795 case FAILED:
796 update(workflow);
797 break;
798 case FAILING:
799 case RUNNING:
800 try {
801 job = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
802 Collections.singletonList(Long.toString(workflow.getId())), null, false, null, WORKFLOW_JOB_LOAD);
803 currentOperation.setId(job.getId());
804 update(workflow);
805 job.setStatus(Status.QUEUED);
806 job.setDispatchable(true);
807 serviceRegistry.updateJob(job);
808 } catch (ServiceRegistryException e) {
809 throw new WorkflowDatabaseException(e);
810 } catch (NotFoundException e) {
811
812 throw new IllegalStateException("Unable to find a job that was just created");
813 }
814 break;
815 case PAUSED:
816 case STOPPED:
817 case SUCCEEDED:
818 update(workflow);
819 break;
820 case INSTANTIATED:
821 update(workflow);
822 throw new IllegalStateException("Impossible workflow state found during processing");
823 default:
824 throw new IllegalStateException("Unknown workflow state found during processing");
825 }
826
827 }
828
829 return processingOperation;
830 }
831
832
833
834
835
836
837 @Override
838 public WorkflowDefinition getWorkflowDefinitionById(String id) throws NotFoundException {
839 final WorkflowIdentifier workflowIdentifier = new WorkflowIdentifier(id, securityService.getOrganization().getId());
840 final WorkflowDefinition def = workflowDefinitionScanner
841 .getWorkflowDefinition(securityService.getUser(), workflowIdentifier);
842 if (def == null) {
843 throw new NotFoundException("Workflow definition '" + workflowIdentifier + "' not found or inaccessible");
844 }
845 return def;
846 }
847
848
849
850
851
852
853 @Override
854 public WorkflowInstance stop(long workflowInstanceId) throws WorkflowException, NotFoundException,
855 UnauthorizedException {
856 final Lock lock = this.lock.get(workflowInstanceId);
857 lock.lock();
858 try {
859 WorkflowInstance instance = getWorkflowById(workflowInstanceId);
860
861 if (instance.getState() != STOPPED) {
862
863 instance.setState(STOPPED);
864 update(instance);
865 }
866
867 try {
868 removeTempFiles(instance);
869 } catch (Exception e) {
870 logger.warn("Cannot remove temp files for workflow instance {}", workflowInstanceId, e);
871 }
872
873 return instance;
874 } finally {
875 lock.unlock();
876 }
877 }
878
879
880
881
882 private void validUserOrThrow(User user) {
883 if (user == null) {
884 throw new SecurityException("Current user is unknown");
885 }
886
887 if (userDirectoryService.loadUser(user.getUsername()) == null) {
888 throw new SecurityException(String.format("Current user '%s' can not be loaded", user.getUsername()));
889 }
890 }
891
892 private void removeTempFiles(WorkflowInstance workflowInstance) {
893 logger.info("Removing temporary files for workflow {}", workflowInstance.getId());
894 MediaPackage mp = workflowInstance.getMediaPackage();
895 if (null == mp) {
896 logger.warn("Workflow instance {} does not have an media package set", workflowInstance.getId());
897 return;
898 }
899 for (MediaPackageElement elem : mp.getElements()) {
900
901 if (elem instanceof Publication) {
902 continue;
903 }
904 if (null == elem.getURI()) {
905 logger.warn("Media package element {} from the media package {} does not have an URI set",
906 elem.getIdentifier(), mp.getIdentifier());
907 continue;
908 }
909 try {
910 logger.debug("Removing temporary file {} for workflow {}", elem.getURI(), workflowInstance);
911 workspace.delete(elem.getURI());
912 } catch (IOException e) {
913 logger.warn("Unable to delete mediapackage element", e);
914 } catch (NotFoundException e) {
915
916 }
917 }
918 }
919
920
921
922
923
924
925
926 @Override
927 public void remove(long workflowInstanceId) throws WorkflowDatabaseException, NotFoundException,
928 UnauthorizedException, WorkflowParsingException, WorkflowStateException {
929 remove(workflowInstanceId, false);
930 }
931
932
933
934
935
936
937 @Override
938 public void remove(long workflowInstanceId, boolean force) throws WorkflowDatabaseException, NotFoundException,
939 UnauthorizedException, WorkflowStateException {
940 final Lock lock = this.lock.get(workflowInstanceId);
941 lock.lock();
942 try {
943 WorkflowInstance instance = getWorkflowById(workflowInstanceId);
944 WorkflowInstance.WorkflowState state = instance.getState();
945 if (!state.isTerminated() && !force) {
946 throw new WorkflowStateException("Workflow instance with state '" + state + "' cannot be removed "
947 + "since it is not yet terminated.");
948 }
949
950 assertPermission(instance, Permissions.Action.WRITE.toString(), instance.getOrganizationId());
951
952
953 removeTempFiles(instance);
954
955
956 List<WorkflowOperationInstance> operations = instance.getOperations();
957 List<Long> jobsToDelete = new ArrayList<>();
958 for (WorkflowOperationInstance op : operations) {
959 if (op.getId() != null) {
960 long workflowOpId = op.getId();
961 if (workflowOpId != workflowInstanceId) {
962 jobsToDelete.add(workflowOpId);
963 }
964 }
965 }
966 try {
967 serviceRegistry.removeJobs(jobsToDelete);
968 } catch (ServiceRegistryException e) {
969 logger.warn("Problems while removing jobs related to workflow operations '{}'", jobsToDelete, e);
970 } catch (NotFoundException e) {
971 logger.debug("No jobs related to one of the workflow operation '{}' found in service registry", jobsToDelete);
972 }
973
974
975 try {
976 serviceRegistry.removeJobs(Collections.singletonList(workflowInstanceId));
977 removeWorkflowInstanceFromIndex(instance.getId());
978 } catch (ServiceRegistryException e) {
979 logger.warn("Problems while removing workflow instance job '{}'", workflowInstanceId, e);
980 } catch (NotFoundException e) {
981 logger.info("No workflow instance job '{}' found in the service registry", workflowInstanceId);
982 }
983
984
985 persistence.removeFromDatabase(instance);
986 } finally {
987 lock.unlock();
988 }
989 }
990
991
992
993
994
995
996 @Override
997 public WorkflowInstance suspend(long workflowInstanceId) throws WorkflowException, NotFoundException,
998 UnauthorizedException {
999 final Lock lock = this.lock.get(workflowInstanceId);
1000 lock.lock();
1001 try {
1002 WorkflowInstance instance = getWorkflowById(workflowInstanceId);
1003 instance.setState(PAUSED);
1004 update(instance);
1005 return instance;
1006 } finally {
1007 lock.unlock();
1008 }
1009 }
1010
1011
1012
1013
1014
1015
1016 @Override
1017 public WorkflowInstance resume(long id) throws WorkflowException, NotFoundException, IllegalStateException,
1018 UnauthorizedException {
1019 return resume(id, null);
1020 }
1021
1022
1023
1024
1025
1026
1027 @Override
1028 public WorkflowInstance resume(long workflowInstanceId, Map<String, String> properties) throws WorkflowException,
1029 NotFoundException, IllegalStateException, UnauthorizedException {
1030 WorkflowInstance workflowInstance = getWorkflowById(workflowInstanceId);
1031 if (!WorkflowState.PAUSED.equals(workflowInstance.getState())) {
1032 throw new IllegalStateException("Can not resume a workflow where the current state is not in paused");
1033 }
1034
1035 workflowInstance = updateConfiguration(workflowInstance, properties);
1036 update(workflowInstance);
1037
1038 WorkflowOperationInstance currentOperation = workflowInstance.getCurrentOperation();
1039
1040
1041 if (currentOperation == null) {
1042
1043
1044 workflowInstance.setState(SUCCEEDED);
1045 for (WorkflowOperationInstance op : workflowInstance.getOperations()) {
1046 if (op.getState().equals(WorkflowOperationInstance.OperationState.FAILED)) {
1047 if (op.isFailOnError()) {
1048 workflowInstance.setState(FAILED);
1049 break;
1050 }
1051 }
1052 }
1053
1054
1055 logger.debug("{} has {}", workflowInstance, workflowInstance.getState());
1056 update(workflowInstance);
1057 return workflowInstance;
1058 }
1059
1060
1061
1062 if (OperationState.INSTANTIATED.equals(currentOperation.getState())) {
1063 try {
1064
1065 Job operationJob = serviceRegistry.createJob(JOB_TYPE, Operation.START_OPERATION.toString(),
1066 Collections.singletonList(Long.toString(workflowInstanceId)), null, false, null, WORKFLOW_JOB_LOAD);
1067
1068
1069
1070 workflowInstance.setState(RUNNING);
1071 currentOperation.setId(operationJob.getId());
1072
1073
1074 update(workflowInstance);
1075
1076
1077 operationJob.setStatus(Status.QUEUED);
1078 operationJob.setDispatchable(true);
1079 serviceRegistry.updateJob(operationJob);
1080
1081 return workflowInstance;
1082 } catch (ServiceRegistryException e) {
1083 throw new WorkflowDatabaseException(e);
1084 }
1085 }
1086
1087 Long operationJobId = workflowInstance.getCurrentOperation().getId();
1088 if (operationJobId == null) {
1089 throw new IllegalStateException("Can not resume a workflow where the current operation has no associated id");
1090 }
1091
1092
1093 Job workflowJob;
1094 try {
1095 workflowJob = serviceRegistry.getJob(workflowInstanceId);
1096 workflowJob.setStatus(Status.RUNNING);
1097 persistence.updateInDatabase(workflowInstance);
1098 serviceRegistry.updateJob(workflowJob);
1099
1100 Job operationJob = serviceRegistry.getJob(operationJobId);
1101 operationJob.setStatus(Status.QUEUED);
1102 operationJob.setDispatchable(true);
1103 if (properties != null) {
1104 Properties props = new Properties();
1105 props.putAll(properties);
1106 ByteArrayOutputStream out = new ByteArrayOutputStream();
1107 props.store(out, null);
1108 List<String> newArguments = new ArrayList<String>(operationJob.getArguments());
1109 newArguments.add(new String(out.toByteArray(), StandardCharsets.UTF_8));
1110 operationJob.setArguments(newArguments);
1111 }
1112 serviceRegistry.updateJob(operationJob);
1113 } catch (ServiceRegistryException e) {
1114 throw new WorkflowDatabaseException(e);
1115 } catch (IOException e) {
1116 throw new WorkflowParsingException("Unable to parse workflow and/or workflow properties");
1117 }
1118
1119 return workflowInstance;
1120 }
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132 protected void assertPermission(WorkflowInstance workflow, String action, String workflowOrgId)
1133 throws UnauthorizedException {
1134 User currentUser = securityService.getUser();
1135 Organization currentOrg = securityService.getOrganization();
1136 String currentOrgAdminRole = currentOrg.getAdminRole();
1137 String currentOrgId = currentOrg.getId();
1138
1139 MediaPackage mediapackage = workflow.getMediaPackage();
1140
1141 WorkflowState state = workflow.getState();
1142 if (state != INSTANTIATED && state != RUNNING && workflow.getState() != FAILING) {
1143 Optional<MediaPackage> assetMediapackage = assetManager.getMediaPackage(mediapackage.getIdentifier().toString());
1144 if (assetMediapackage.isPresent()) {
1145 mediapackage = assetMediapackage.get();
1146 }
1147 }
1148
1149 var creatorName = workflow.getCreatorName();
1150 var workflowCreator = creatorName == null ? null : userDirectoryService.loadUser(creatorName);
1151 boolean authorized = currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1152 || (currentUser.hasRole(currentOrgAdminRole) && currentOrgId.equals(workflowOrgId))
1153 || (currentUser.equals(workflowCreator))
1154 || (authorizationService.hasPermission(mediapackage, action) && currentOrgId.equals(workflowOrgId));
1155
1156 if (!authorized) {
1157 throw new UnauthorizedException(currentUser, action);
1158 }
1159 }
1160
1161 protected boolean assertMediaPackagePermission(String mediaPackageId, String action) {
1162 var currentUser = securityService.getUser();
1163 Optional<MediaPackage> mp = assetManager.getMediaPackage(mediaPackageId);
1164
1165
1166
1167
1168 return currentUser.hasRole(GLOBAL_ADMIN_ROLE)
1169 || mp.isPresent() && currentUser.hasRole(securityService.getOrganization().getAdminRole())
1170 || mp.isPresent() && authorizationService.hasPermission(mp.get(), action);
1171 }
1172
1173
1174
1175
1176
1177
1178 @Override
1179 public void update(final WorkflowInstance workflowInstance) throws WorkflowDatabaseException, UnauthorizedException {
1180 final Lock lock = updateLock.get(workflowInstance.getId());
1181 lock.lock();
1182
1183 try {
1184 WorkflowInstance originalWorkflowInstance = null;
1185 try {
1186
1187 originalWorkflowInstance = getWorkflowById(workflowInstance.getId());
1188 } catch (NotFoundException e) {
1189
1190 }
1191
1192 MediaPackage updatedMediaPackage = null;
1193 try {
1194
1195
1196 updatedMediaPackage = workflowInstance.getMediaPackage();
1197
1198 populateMediaPackageMetadata(updatedMediaPackage);
1199
1200 String seriesId = updatedMediaPackage.getSeries();
1201 if (seriesId != null && workflowInstance.getCurrentOperation() != null) {
1202
1203
1204
1205 try {
1206 AccessControlList acl = seriesService.getSeriesAccessControl(seriesId);
1207 Tuple<AccessControlList, AclScope> activeAcl = authorizationService.getAcl(
1208 updatedMediaPackage, AclScope.Series);
1209
1210 if (!AclScope.Series.equals(activeAcl.getB()) || !AccessControlUtil.equals(activeAcl.getA(), acl)) {
1211 authorizationService.setAcl(updatedMediaPackage, AclScope.Series, acl);
1212 }
1213 } catch (NotFoundException e) {
1214 logger.debug("Not updating series ACL on event {} since series {} has no ACL set",
1215 updatedMediaPackage, seriesId, e);
1216 }
1217 }
1218
1219 workflowInstance.setMediaPackage(updatedMediaPackage);
1220 } catch (SeriesException e) {
1221 throw new WorkflowDatabaseException(e);
1222 } catch (Exception e) {
1223 logger.error("Metadata for media package {} could not be updated", updatedMediaPackage, e);
1224 }
1225
1226
1227 WorkflowState workflowState = workflowInstance.getState();
1228
1229 Job job;
1230 try {
1231 job = serviceRegistry.getJob(workflowInstance.getId());
1232 job.setPayload(Long.toString(workflowInstance.getId()));
1233
1234
1235 switch (workflowState) {
1236 case FAILED:
1237 job.setStatus(Status.FAILED);
1238 break;
1239 case FAILING:
1240 break;
1241 case INSTANTIATED:
1242 job.setDispatchable(true);
1243 job.setStatus(Status.QUEUED);
1244 break;
1245 case PAUSED:
1246 job.setStatus(Status.PAUSED);
1247 break;
1248 case RUNNING:
1249 job.setStatus(Status.RUNNING);
1250 break;
1251 case STOPPED:
1252 job.setStatus(Status.CANCELLED);
1253 break;
1254 case SUCCEEDED:
1255 job.setStatus(Status.FINISHED);
1256 break;
1257 default:
1258 throw new IllegalStateException("Found a workflow state that is not handled");
1259 }
1260 } catch (ServiceRegistryException e) {
1261 throw new WorkflowDatabaseException(
1262 "Unable to read workflow job " + workflowInstance.getId() + " from service registry", e);
1263 } catch (NotFoundException e) {
1264 throw new WorkflowDatabaseException(
1265 "Job for workflow " + workflowInstance.getId() + " not found in service registry", e);
1266 }
1267
1268
1269 try {
1270
1271 persistence.updateInDatabase(workflowInstance);
1272
1273 job = serviceRegistry.updateJob(job);
1274
1275 WorkflowOperationInstance op = workflowInstance.getCurrentOperation();
1276
1277
1278
1279
1280 if (op == null || op.getState() != OperationState.RUNNING) {
1281
1282 long id = workflowInstance.getId();
1283 int state = workflowInstance.getState().ordinal();
1284 var template = workflowInstance.getTemplate();
1285 String mpId = workflowInstance.getMediaPackage().getIdentifier().toString();
1286 String orgId = workflowInstance.getOrganizationId();
1287
1288 updateWorkflowInstanceInIndex(id, state, template, mpId, orgId);
1289 }
1290 } catch (ServiceRegistryException e) {
1291 throw new WorkflowDatabaseException("Update of workflow job " + workflowInstance.getId()
1292 + " in the service registry failed, service registry and workflow table may be out of sync", e);
1293 } catch (NotFoundException e) {
1294 throw new WorkflowDatabaseException("Job for workflow " + workflowInstance.getId()
1295 + " not found in service registry", e);
1296 } catch (Exception e) {
1297 throw new WorkflowDatabaseException("Update of workflow job " + job.getId()
1298 + " in the service registry failed, service registry and workflow table may be out of sync", e);
1299 }
1300
1301 try {
1302 fireListeners(originalWorkflowInstance, workflowInstance);
1303 } catch (Exception e) {
1304
1305 throw new IllegalStateException("In-memory workflow instance could not be serialized", e);
1306 }
1307 } finally {
1308 lock.unlock();
1309 }
1310 }
1311
1312
1313
1314
1315
1316
1317 @Override
1318 public long countWorkflowInstances() throws WorkflowDatabaseException {
1319 return countWorkflowInstances(null);
1320 }
1321
1322
1323
1324
1325
1326
1327
1328 @Override
1329 public long countWorkflowInstances(WorkflowState state) throws WorkflowDatabaseException {
1330 return persistence.countWorkflows(state);
1331 }
1332
1333
1334
1335
1336
1337
1338 @Override
1339 public List<WorkflowInstance> getWorkflowInstancesByMediaPackage(String mediaPackageId)
1340 throws WorkflowDatabaseException, UnauthorizedException {
1341
1342 if (!assertMediaPackagePermission(mediaPackageId, Permissions.Action.READ.toString())) {
1343 throw new UnauthorizedException("Not allowed to access event");
1344 }
1345 return persistence.getWorkflowInstancesByMediaPackage(mediaPackageId);
1346 }
1347
1348
1349
1350
1351
1352
1353 @Override
1354 public Optional<WorkflowInstance> getRunningWorkflowInstanceByMediaPackage(String mediaPackageId, String action)
1355 throws WorkflowException, UnauthorizedException, WorkflowDatabaseException {
1356 List<WorkflowInstance> workflowInstances = persistence.getRunningWorkflowInstancesByMediaPackage(mediaPackageId);
1357
1358
1359 if (workflowInstances.size() > 1) {
1360 throw new WorkflowException("Multiple workflows are active on mediapackage " + mediaPackageId);
1361 }
1362
1363 Optional<WorkflowInstance> optWorkflowInstance = Optional.empty();
1364 if (workflowInstances.size() == 1) {
1365 WorkflowInstance wfInstance = workflowInstances.get(0);
1366 optWorkflowInstance = Optional.of(wfInstance);
1367 assertPermission(wfInstance, action, wfInstance.getOrganizationId());
1368 }
1369
1370 return optWorkflowInstance;
1371 }
1372
1373
1374
1375
1376
1377
1378 @Override
1379 public boolean mediaPackageHasActiveWorkflows(String mediaPackageId) throws WorkflowDatabaseException {
1380 return persistence.mediaPackageHasActiveWorkflows(mediaPackageId);
1381 }
1382
1383
1384
1385
1386
1387
1388 @Override
1389 public boolean userHasActiveWorkflows(String userId) throws WorkflowDatabaseException {
1390 return persistence.userHasActiveWorkflows(userId);
1391 }
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403 protected WorkflowInstance handleOperationException(
1404 WorkflowInstance workflow,
1405 WorkflowOperationInstance currentOperation) {
1406 int failedAttempt = currentOperation.getFailedAttempts() + 1;
1407 currentOperation.setFailedAttempts(failedAttempt);
1408
1409
1410 if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate())
1411 && OperationState.FAILED.equals(currentOperation.getState())) {
1412 int position = workflow.getOperations().indexOf(currentOperation);
1413
1414 if (workflow.getOperations().size() > position + 1) {
1415 currentOperation = workflow.getOperations().get(position + 1);
1416
1417 currentOperation.setState(OperationState.FAILED);
1418 }
1419 handleFailedOperation(workflow, currentOperation);
1420 } else if (currentOperation.getMaxAttempts() != -1 && failedAttempt == currentOperation.getMaxAttempts()) {
1421 handleFailedOperation(workflow, currentOperation);
1422 } else {
1423 switch (currentOperation.getRetryStrategy()) {
1424 case NONE:
1425 handleFailedOperation(workflow, currentOperation);
1426 break;
1427 case RETRY:
1428 currentOperation.setState(OperationState.RETRY);
1429 break;
1430 case HOLD:
1431 currentOperation.setState(OperationState.RETRY);
1432 List<WorkflowOperationInstance> operations = workflow.getOperations();
1433 WorkflowOperationDefinitionImpl errorResolutionDefinition = new WorkflowOperationDefinitionImpl(
1434 ERROR_RESOLUTION_HANDLER_ID, "Error Resolution Operation", "error", false);
1435 var errorResolutionInstance = new WorkflowOperationInstance(errorResolutionDefinition);
1436 errorResolutionInstance.setExceptionHandlingWorkflow(currentOperation.getExceptionHandlingWorkflow());
1437 var index = workflow.getOperations().indexOf(currentOperation);
1438 operations.add(index, errorResolutionInstance);
1439 workflow.setOperations(operations);
1440 break;
1441 default:
1442 break;
1443 }
1444 }
1445 return workflow;
1446 }
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456 private void handleFailedOperation(WorkflowInstance workflow, WorkflowOperationInstance currentOperation) {
1457 String errorDefId = currentOperation.getExceptionHandlingWorkflow();
1458
1459
1460 if (currentOperation.isFailOnError()) {
1461 if (StringUtils.isBlank(errorDefId)) {
1462 workflow.setState(FAILED);
1463 } else {
1464 workflow.setState(FAILING);
1465
1466
1467 int currentOperationPosition = workflow.getOperations().indexOf(currentOperation);
1468 List<WorkflowOperationInstance> operations = new ArrayList<>(
1469 workflow.getOperations().subList(0, currentOperationPosition + 1));
1470 workflow.setOperations(operations);
1471
1472
1473 Map<String, String> configuration = new HashMap<>();
1474 for (String configKey : workflow.getConfigurationKeys()) {
1475 configuration.put(configKey, workflow.getConfiguration(configKey));
1476 }
1477
1478
1479 WorkflowDefinition errorDef = null;
1480 try {
1481 errorDef = getWorkflowDefinitionById(errorDefId);
1482 workflow.extend(errorDef);
1483 workflow.setOperations(updateConfiguration(workflow, configuration).getOperations());
1484 } catch (NotFoundException notFoundException) {
1485 throw new IllegalStateException("Unable to find the error workflow definition '" + errorDefId + "'");
1486 }
1487 }
1488 }
1489
1490
1491 currentOperation.setState(OperationState.FAILED);
1492 }
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506 protected WorkflowInstance handleOperationResult(WorkflowInstance workflow, WorkflowOperationResult result)
1507 throws WorkflowDatabaseException {
1508
1509
1510 WorkflowOperationInstance currentOperation = workflow.getCurrentOperation();
1511 WorkflowOperationHandler handler = getWorkflowOperationHandler(currentOperation.getTemplate());
1512
1513
1514 if (result == null) {
1515 logger.warn("Handling a null operation result for workflow {} in operation {}", workflow.getId(),
1516 currentOperation.getTemplate());
1517 result = new WorkflowOperationResultImpl(workflow.getMediaPackage(), null, Action.CONTINUE, 0);
1518 } else {
1519 MediaPackage mp = result.getMediaPackage();
1520 if (mp != null) {
1521 workflow.setMediaPackage(mp);
1522 }
1523 }
1524
1525
1526 Action action = result.getAction();
1527
1528
1529 workflow = updateConfiguration(workflow, result.getProperties());
1530
1531
1532 currentOperation.setTimeInQueue(result.getTimeInQueue());
1533
1534
1535 switch (action) {
1536 case CONTINUE:
1537 currentOperation.setState(OperationState.SUCCEEDED);
1538 break;
1539 case PAUSE:
1540 if (!(handler instanceof ResumableWorkflowOperationHandler)) {
1541 throw new IllegalStateException("Operation " + currentOperation.getTemplate() + " is not resumable");
1542 }
1543
1544
1545 currentOperation.setContinuable(result.allowsContinue());
1546 currentOperation.setAbortable(result.allowsAbort());
1547
1548 workflow.setState(PAUSED);
1549 currentOperation.setState(OperationState.PAUSED);
1550 break;
1551 case SKIP:
1552 currentOperation.setState(OperationState.SKIPPED);
1553 break;
1554 default:
1555 throw new IllegalStateException("Unknown action '" + action + "' returned");
1556 }
1557
1558 if (ERROR_RESOLUTION_HANDLER_ID.equals(currentOperation.getTemplate()) && result.getAction() == Action.CONTINUE) {
1559
1560 Map<String, String> resultProperties = result.getProperties();
1561 if (resultProperties == null || StringUtils.isBlank(resultProperties.get(RETRY_STRATEGY))) {
1562 throw new WorkflowDatabaseException("Retry strategy not present in properties!");
1563 }
1564
1565 RetryStrategy retryStrategy = RetryStrategy.valueOf(resultProperties.get(RETRY_STRATEGY));
1566 switch (retryStrategy) {
1567 case NONE:
1568 handleFailedOperation(workflow, workflow.getCurrentOperation());
1569 break;
1570 case RETRY:
1571 break;
1572 default:
1573 throw new WorkflowDatabaseException("Retry strategy not implemented yet!");
1574 }
1575 }
1576
1577 return workflow;
1578 }
1579
1580
1581
1582
1583
1584
1585
1586 protected void populateMediaPackageMetadata(MediaPackage mp) {
1587 if (metadataServices.isEmpty()) {
1588 logger.warn("No metadata services are registered, so no media package metadata can be extracted from catalogs");
1589 return;
1590 }
1591 for (MediaPackageMetadataService metadataService : metadataServices) {
1592 MediaPackageMetadata metadata = metadataService.getMetadata(mp);
1593 MediaPackageMetadataSupport.populateMediaPackageMetadata(mp, metadata);
1594 }
1595 }
1596
1597
1598
1599
1600
1601
1602 @Override
1603 public boolean isReadyToAcceptJobs(String operation) {
1604 return true;
1605 }
1606
1607
1608
1609
1610
1611
1612
1613
1614 @Override
1615 public boolean isReadyToAccept(Job job) throws UndispatchableJobException {
1616 String operation = job.getOperation();
1617
1618
1619 if (!Operation.START_WORKFLOW.toString().equals(operation)) {
1620 return true;
1621 }
1622
1623
1624 if (job.getArguments().size() > 1 && job.getArguments().get(0) != null) {
1625 try {
1626 WorkflowDefinition workflowDef = XmlWorkflowParser.parseWorkflowDefinition(job.getArguments().get(0));
1627 if (workflowDef.getOperations().size() > 0) {
1628 String firstOperationId = workflowDef.getOperations().get(0).getId();
1629 WorkflowOperationHandler handler = getWorkflowOperationHandler(firstOperationId);
1630 if (handler instanceof ResumableWorkflowOperationHandler) {
1631 if (((ResumableWorkflowOperationHandler) handler).isAlwaysPause()) {
1632 return true;
1633 }
1634 }
1635 }
1636 } catch (WorkflowParsingException e) {
1637 throw new UndispatchableJobException(job + " is not a proper job to start a workflow", e);
1638 }
1639 }
1640
1641 WorkflowInstance workflow;
1642 Optional<WorkflowInstance> workflowInstance;
1643 String mediaPackageId;
1644
1645
1646 try {
1647 workflow = getWorkflowById(job.getId());
1648 mediaPackageId = workflow.getMediaPackage().getIdentifier().toString();
1649 } catch (NotFoundException e) {
1650 throw new UndispatchableJobException("Trying to start workflow with job id " + job.getId()
1651 + " but no corresponding instance is available from the workflow service", e);
1652 } catch (UnauthorizedException e) {
1653 throw new UndispatchableJobException(
1654 "Authorization denied while requesting to loading workflow instance. Job: " + job.getId(), e);
1655 }
1656
1657 try {
1658 workflowInstance = getRunningWorkflowInstanceByMediaPackage(
1659 workflow.getMediaPackage().getIdentifier().toString(), Permissions.Action.READ.toString());
1660 } catch (UnauthorizedException e) {
1661 throw new UndispatchableJobException("Authorization denied while requesting to loading workflow instance "
1662 + workflow.getId(), e);
1663 } catch (WorkflowDatabaseException e) {
1664 throw new UndispatchableJobException("An database error occurred while checking if a workflow is already active "
1665 + "(job: " + job.getId() + ")", e);
1666 } catch (WorkflowException e) {
1667
1668 delayWorkflow(workflow, mediaPackageId);
1669 return false;
1670 }
1671
1672
1673 if (workflowInstance.isPresent() && workflow.getId() != workflowInstance.get().getId()) {
1674 delayWorkflow(workflow, mediaPackageId);
1675 return false;
1676 }
1677
1678 return true;
1679 }
1680
1681 private void delayWorkflow(WorkflowInstance workflow, String mediaPackageId) {
1682 if (!delayedWorkflows.contains(workflow.getId())) {
1683 logger.info("Delaying start of workflow {}, another workflow on media package {} is still running",
1684 workflow.getId(), mediaPackageId);
1685 delayedWorkflows.add(workflow.getId());
1686 }
1687 }
1688
1689
1690
1691
1692
1693
1694 @Override
1695 public synchronized void acceptJob(Job job) throws ServiceRegistryException {
1696 User originalUser = securityService.getUser();
1697 Organization originalOrg = securityService.getOrganization();
1698 try {
1699 Organization organization = organizationDirectoryService.getOrganization(job.getOrganization());
1700 securityService.setOrganization(organization);
1701 User user = userDirectoryService.loadUser(job.getCreator());
1702 securityService.setUser(user);
1703 job.setStatus(Job.Status.RUNNING);
1704 job = serviceRegistry.updateJob(job);
1705
1706
1707 if (delayedWorkflows.contains(job.getId())) {
1708 delayedWorkflows.remove(job.getId());
1709 logger.info("Starting initially delayed workflow {}, {} more waiting", job.getId(), delayedWorkflows.size());
1710 }
1711
1712 executorService.submit(new JobRunner(job, serviceRegistry.getCurrentJob()));
1713 } catch (Exception e) {
1714 if (e instanceof ServiceRegistryException) {
1715 throw (ServiceRegistryException) e;
1716 }
1717 throw new ServiceRegistryException(e);
1718 } finally {
1719 securityService.setUser(originalUser);
1720 securityService.setOrganization(originalOrg);
1721 }
1722 }
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733 protected String process(Job job) throws Exception {
1734 List<String> arguments = job.getArguments();
1735 Operation op = null;
1736 WorkflowInstance workflowInstance = null;
1737 WorkflowOperationInstance wfo;
1738 String operation = job.getOperation();
1739 try {
1740 try {
1741 op = Operation.valueOf(operation);
1742 switch (op) {
1743 case START_WORKFLOW:
1744 workflowInstance = persistence.getWorkflow(Long.parseLong(job.getPayload()));
1745 logger.debug("Starting new workflow {}", workflowInstance);
1746 runWorkflow(workflowInstance);
1747 break;
1748 case RESUME:
1749 workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1750 Map<String, String> properties = null;
1751 if (arguments.size() > 1) {
1752 Properties props = new Properties();
1753 props.load(IOUtils.toInputStream(arguments.get(arguments.size() - 1), StandardCharsets.UTF_8));
1754 properties = new HashMap<>();
1755 for (Entry<Object, Object> entry : props.entrySet()) {
1756 properties.put(entry.getKey().toString(), entry.getValue().toString());
1757 }
1758 }
1759 logger.debug("Resuming {} at {}", workflowInstance, workflowInstance.getCurrentOperation());
1760 workflowInstance.setState(RUNNING);
1761 update(workflowInstance);
1762 runWorkflowOperation(workflowInstance, properties);
1763 break;
1764 case START_OPERATION:
1765 workflowInstance = getWorkflowById(Long.parseLong(arguments.get(0)));
1766 wfo = workflowInstance.getCurrentOperation();
1767
1768 if (OperationState.RUNNING.equals(wfo.getState()) || OperationState.PAUSED.equals(wfo.getState())) {
1769 logger.info("Reset operation state {} {} to INSTANTIATED due to job restart", workflowInstance, wfo);
1770 wfo.setState(OperationState.INSTANTIATED);
1771 }
1772
1773 wfo.setExecutionHost(job.getProcessingHost());
1774 logger.debug("Running {} {}", workflowInstance, wfo);
1775 wfo = runWorkflowOperation(workflowInstance, null);
1776 updateOperationJob(job.getId(), wfo.getState());
1777 break;
1778 default:
1779 throw new IllegalStateException("Don't know how to handle operation '" + operation + "'");
1780 }
1781 } catch (IllegalArgumentException e) {
1782 throw new ServiceRegistryException("This service can't handle operations of type '" + op + "'", e);
1783 } catch (IndexOutOfBoundsException e) {
1784 throw new ServiceRegistryException(
1785 "The argument list for operation '" + op + "' (job: " + job.getId() + ") does not meet expectations", e);
1786 } catch (NotFoundException e) {
1787 logger.warn("Not found processing job {}", job, e);
1788 updateOperationJob(job.getId(), OperationState.FAILED);
1789 }
1790 return null;
1791 } catch (Exception e) {
1792 logger.warn("Exception while accepting job {}", job, e);
1793 try {
1794 if (workflowInstance != null) {
1795 logger.warn("Marking job {} and workflow instance {} as failed", job, workflowInstance);
1796 updateOperationJob(job.getId(), OperationState.FAILED);
1797 workflowInstance.setState(FAILED);
1798 update(workflowInstance);
1799 } else {
1800 logger.warn("Unable to parse workflow instance", e);
1801 }
1802 } catch (WorkflowDatabaseException e1) {
1803 throw new ServiceRegistryException(e1);
1804 }
1805 if (e instanceof ServiceRegistryException) {
1806 throw e;
1807 }
1808 throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
1809 }
1810 }
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826 private Job updateOperationJob(Long jobId, OperationState state) throws NotFoundException, ServiceRegistryException {
1827 if (jobId == null) {
1828 return null;
1829 }
1830 Job job = serviceRegistry.getJob(jobId);
1831 switch (state) {
1832 case FAILED:
1833 case RETRY:
1834 job.setStatus(Status.FAILED);
1835 break;
1836 case PAUSED:
1837 job.setStatus(Status.PAUSED);
1838 job.setOperation(Operation.RESUME.toString());
1839 break;
1840 case SKIPPED:
1841 case SUCCEEDED:
1842 job.setStatus(Status.FINISHED);
1843 break;
1844 default:
1845 throw new IllegalStateException("Unexpected state '" + state + "' found");
1846 }
1847 return serviceRegistry.updateJob(job);
1848 }
1849
1850
1851
1852
1853
1854
1855 @Override
1856 public long countJobs(Status status) throws ServiceRegistryException {
1857 return serviceRegistry.count(JOB_TYPE, status);
1858 }
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868 private String mapToString(Map<String, String> props) {
1869 if (props == null) {
1870 return null;
1871 }
1872 StringBuilder sb = new StringBuilder();
1873 for (Entry<String, String> entry : props.entrySet()) {
1874 sb.append(entry.getKey());
1875 sb.append("=");
1876 sb.append(entry.getValue());
1877 sb.append("\n");
1878 }
1879 return sb.toString();
1880 }
1881
1882
1883
1884
1885
1886
1887
1888 @Reference(target = "(artifact=workflowdefinition)")
1889 protected void setProfilesReadyIndicator(ReadinessIndicator unused) { }
1890
1891
1892
1893
1894
1895
1896
1897 @Reference
1898 protected void setWorkspace(Workspace workspace) {
1899 this.workspace = workspace;
1900 }
1901
1902
1903
1904
1905
1906
1907
1908 @Reference
1909 protected void setServiceRegistry(ServiceRegistry registry) {
1910 this.serviceRegistry = registry;
1911 }
1912
1913 public ServiceRegistry getServiceRegistry() {
1914 return serviceRegistry;
1915 }
1916
1917
1918
1919
1920
1921
1922
1923 @Reference
1924 public void setSecurityService(SecurityService securityService) {
1925 this.securityService = securityService;
1926 }
1927
1928
1929
1930
1931
1932
1933
1934 @Reference
1935 public void setAuthorizationService(AuthorizationService authorizationService) {
1936 this.authorizationService = authorizationService;
1937 }
1938
1939
1940
1941
1942
1943
1944
1945 @Reference
1946 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1947 this.userDirectoryService = userDirectoryService;
1948 }
1949
1950
1951
1952
1953
1954
1955
1956 @Reference
1957 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
1958 this.organizationDirectoryService = organizationDirectory;
1959 }
1960
1961
1962
1963
1964
1965
1966
1967 @Reference
1968 public void setSeriesService(SeriesService seriesService) {
1969 this.seriesService = seriesService;
1970 }
1971
1972
1973
1974
1975
1976
1977
1978 @Reference
1979 public void setAssetManager(AssetManager assetManager) {
1980 this.assetManager = assetManager;
1981 }
1982
1983
1984
1985
1986
1987
1988
1989 @Reference(
1990 cardinality = ReferenceCardinality.AT_LEAST_ONE,
1991 policy = ReferencePolicy.DYNAMIC,
1992 unbind = "removeMetadataService"
1993 )
1994 protected void addMetadataService(MediaPackageMetadataService service) {
1995 metadataServices.add(service);
1996 }
1997
1998
1999
2000
2001
2002
2003
2004 protected void removeMetadataService(MediaPackageMetadataService service) {
2005 metadataServices.remove(service);
2006 }
2007
2008
2009
2010
2011
2012
2013
2014 @Reference
2015 protected void addWorkflowDefinitionScanner(WorkflowDefinitionScanner scanner) {
2016 workflowDefinitionScanner = scanner;
2017 }
2018
2019
2020
2021
2022
2023
2024
2025 @Reference
2026 public void setIndex(ElasticsearchIndex index) {
2027 this.index = index;
2028 }
2029
2030
2031
2032
2033
2034
2035
2036 @Reference(name = "workflow-persistence")
2037 public void setPersistence(WorkflowServiceDatabase persistence) {
2038 this.persistence = persistence;
2039 }
2040
2041
2042
2043
2044
2045
2046 @Override
2047 public String getJobType() {
2048 return JOB_TYPE;
2049 }
2050
2051
2052
2053
2054 public static class HandlerRegistration {
2055
2056 protected WorkflowOperationHandler handler;
2057 protected String operationName;
2058
2059 public HandlerRegistration(String operationName, WorkflowOperationHandler handler) {
2060 if (operationName == null) {
2061 throw new IllegalArgumentException("Operation name cannot be null");
2062 }
2063 if (handler == null) {
2064 throw new IllegalArgumentException("Handler cannot be null");
2065 }
2066 this.operationName = operationName;
2067 this.handler = handler;
2068 }
2069
2070 public WorkflowOperationHandler getHandler() {
2071 return handler;
2072 }
2073
2074
2075
2076
2077
2078
2079 @Override
2080 public int hashCode() {
2081 final int prime = 31;
2082 int result = 1;
2083 result = prime * result + handler.hashCode();
2084 result = prime * result + operationName.hashCode();
2085 return result;
2086 }
2087
2088
2089
2090
2091
2092
2093 @Override
2094 public boolean equals(Object obj) {
2095 if (this == obj) {
2096 return true;
2097 }
2098 if (obj == null) {
2099 return false;
2100 }
2101 if (getClass() != obj.getClass()) {
2102 return false;
2103 }
2104 HandlerRegistration other = (HandlerRegistration) obj;
2105 if (!handler.equals(other.handler)) {
2106 return false;
2107 }
2108 return operationName.equals(other.operationName);
2109 }
2110 }
2111
2112
2113
2114
2115 class JobRunner implements Callable<Void> {
2116
2117
2118 private Job job = null;
2119
2120
2121 private final Job currentJob;
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131 JobRunner(Job job, Job currentJob) {
2132 this.job = job;
2133 this.currentJob = currentJob;
2134 }
2135
2136
2137
2138
2139
2140
2141 @Override
2142 public Void call() throws Exception {
2143 Organization jobOrganization = organizationDirectoryService.getOrganization(job.getOrganization());
2144 try {
2145 serviceRegistry.setCurrentJob(currentJob);
2146 securityService.setOrganization(jobOrganization);
2147 User jobUser = userDirectoryService.loadUser(job.getCreator());
2148 securityService.setUser(jobUser);
2149 process(job);
2150 } finally {
2151 serviceRegistry.setCurrentJob(null);
2152 securityService.setUser(null);
2153 securityService.setOrganization(null);
2154 }
2155 return null;
2156 }
2157 }
2158
2159 @Override
2160 public synchronized void cleanupWorkflowInstances(int buffer, WorkflowState state) throws UnauthorizedException,
2161 WorkflowDatabaseException {
2162 logger.info("Start cleaning up workflow instances older than {} days with status '{}'", buffer, state);
2163
2164 int instancesCleaned = 0;
2165 int cleaningFailed = 0;
2166
2167 Date priorTo = DateUtils.addDays(new Date(), -buffer);
2168
2169 try {
2170 for (WorkflowInstance workflowInstance : persistence.getWorkflowInstancesForCleanup(state, priorTo)) {
2171 try {
2172 logger.debug("Deleting workflow instance {}", workflowInstance.getId());
2173 remove(workflowInstance.getId());
2174 instancesCleaned++;
2175 } catch (WorkflowDatabaseException | UnauthorizedException e) {
2176 throw e;
2177 } catch (NotFoundException e) {
2178
2179 logger.debug("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2180 } catch (WorkflowParsingException | WorkflowStateException e) {
2181 logger.warn("Workflow instance '{}' could not be removed", workflowInstance.getId(), e);
2182 cleaningFailed++;
2183 }
2184 }
2185 } catch (WorkflowDatabaseException e) {
2186 throw new WorkflowDatabaseException(e);
2187 }
2188
2189 if (instancesCleaned == 0 && cleaningFailed == 0) {
2190 logger.info("No workflow instances found to clean up");
2191 return;
2192 }
2193
2194 if (instancesCleaned > 0) {
2195 logger.info("Cleaned up '{}' workflow instances", instancesCleaned);
2196 }
2197 if (cleaningFailed > 0) {
2198 logger.warn("Cleaning failed for '{}' workflow instances", cleaningFailed);
2199 throw new WorkflowDatabaseException("Unable to clean all workflow instances, see logs!");
2200 }
2201 }
2202
2203 @Override
2204 public Map<String, Map<String, String>> getWorkflowStateMappings() {
2205 return workflowDefinitionScanner.workflowStateMappings.entrySet().stream().collect(Collectors.toMap(
2206 Entry::getKey, e -> e.getValue().stream()
2207 .collect(Collectors.toMap(m -> m.getState().name(), WorkflowStateMapping::getValue))
2208 ));
2209 }
2210
2211 @Override
2212 public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
2213 try {
2214 final int total;
2215 try {
2216 total = persistence.countMediaPackages();
2217 } catch (WorkflowDatabaseException e) {
2218 logIndexRebuildError(logger, e);
2219 throw new IndexRebuildException(getService(), e);
2220 }
2221
2222 if (total > 0) {
2223 logIndexRebuildBegin(logger, total, "workflows");
2224 int current = 0;
2225 int n = 20;
2226 List<WorkflowIndexData> workflowIndexData;
2227
2228 int limit = 1000;
2229 int offset = 0;
2230 String currentMediapackageId;
2231 String lastMediapackageId = "";
2232 var updatedWorkflowRange = new ArrayList<Event>();
2233 do {
2234 try {
2235 workflowIndexData = persistence.getWorkflowIndexData(limit, offset);
2236 } catch (WorkflowDatabaseException e) {
2237 logIndexRebuildError(logger, e);
2238 throw new IndexRebuildException(getService(), e);
2239 }
2240 if (!workflowIndexData.isEmpty()) {
2241 offset += limit;
2242 logger.debug("Got {} workflows for re-indexing", workflowIndexData.size());
2243
2244 for (WorkflowIndexData indexData : workflowIndexData) {
2245 currentMediapackageId = indexData.getMediaPackageId();
2246 if (currentMediapackageId.equals(lastMediapackageId)) {
2247 continue;
2248 }
2249 current++;
2250
2251
2252 if (!WorkflowUtil.isActive(WorkflowInstance.WorkflowState.values()[indexData.getState()].toString())
2253 || WorkflowState.PAUSED == WorkflowInstance.WorkflowState.values()[indexData.getState()]) {
2254 String orgid = indexData.getOrganizationId();
2255 if (null == orgid) {
2256 String mpId = indexData.getMediaPackageId();
2257
2258 RichAResult results = assetManager.getSnapshotsById(mpId);
2259 if (results.getSize() == 0) {
2260 logger.debug("Dropping {} from the index since it is missing from the database", mpId);
2261 continue;
2262 }
2263 orgid = results.getSnapshots().stream().findFirst().get().getOrganizationId();
2264
2265
2266 try {
2267
2268
2269 WorkflowInstance instance = persistence.getWorkflow(indexData.getId(), null);
2270 instance.setOrganizationId(orgid);
2271 persistence.updateInDatabase(instance);
2272 } catch (NotFoundException e) {
2273
2274 }
2275 }
2276 var updatedWorkflowData = index.getEvent(indexData.getMediaPackageId(), orgid,
2277 securityService.getUser());
2278 updatedWorkflowData = getStateUpdateFunction(indexData.getId(),indexData.getState(),
2279 indexData.getMediaPackageId(), indexData.getTemplate(), indexData.getOrganizationId())
2280 .apply(updatedWorkflowData);
2281 updatedWorkflowRange.add(updatedWorkflowData.get());
2282
2283 if (updatedWorkflowRange.size() >= n || current >= total) {
2284 index.bulkEventUpdate(updatedWorkflowRange);
2285 logIndexRebuildProgress(logger, total, current, n);
2286 updatedWorkflowRange.clear();
2287 }
2288 }
2289 else {
2290 logger.info("Skipping. Workflow {} is currently active.", indexData.getId());
2291 }
2292
2293 lastMediapackageId = currentMediapackageId;
2294 }
2295 }
2296 } while (!workflowIndexData.isEmpty());
2297 }
2298 } catch (Exception e) {
2299 logIndexRebuildError(logger, e);
2300 throw new IndexRebuildException(getService(), e);
2301 }
2302 }
2303
2304 @Override
2305 public IndexRebuildService.Service getService() {
2306 return IndexRebuildService.Service.Workflow;
2307 }
2308
2309
2310
2311
2312
2313
2314
2315 private void removeWorkflowInstanceFromIndex(long workflowInstanceId) {
2316 final String orgId = securityService.getOrganization().getId();
2317 final User user = securityService.getUser();
2318
2319
2320 SearchResult<Event> results;
2321 try {
2322 results = index.getByQuery(new EventSearchQuery(orgId, user).withWorkflowId(workflowInstanceId));
2323 } catch (SearchIndexException e) {
2324 logger.error("Error retrieving the events for workflow instance {} from the {} index.", workflowInstanceId,
2325 index.getIndexName(), e);
2326 return;
2327 }
2328
2329 if (results.getItems().length == 0) {
2330 logger.warn("No events for workflow instance {} found in the {} index.", workflowInstanceId,
2331 index.getIndexName());
2332 return;
2333 }
2334
2335
2336 for (SearchResultItem<Event> item: results.getItems()) {
2337 String eventId = item.getSource().getIdentifier();
2338 logger.debug("Removing workflow instance {} of event {} from the {} index.", workflowInstanceId, eventId,
2339 index.getIndexName());
2340
2341 Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
2342 if (!eventOpt.isPresent()) {
2343 logger.warn("Event {} of workflow instance {} not found in the {} index.", workflowInstanceId,
2344 eventId, index.getIndexName());
2345 return Optional.empty();
2346 }
2347 Event event = eventOpt.get();
2348 if (event.getWorkflowId() != null && event.getWorkflowId().equals(workflowInstanceId)) {
2349 logger.debug("Workflow {} is the current workflow of event {}. Removing it from event.", eventId,
2350 workflowInstanceId);
2351 event.setWorkflowId(null);
2352 event.setWorkflowDefinitionId(null);
2353 event.setWorkflowState(null);
2354 return Optional.of(event);
2355 }
2356 return Optional.empty();
2357 };
2358
2359 try {
2360 index.addOrUpdateEvent(eventId, updateFunction, orgId, user);
2361 logger.debug("Workflow instance {} of event {} removed from the {} index.", workflowInstanceId, eventId,
2362 index.getIndexName());
2363 } catch (SearchIndexException e) {
2364 logger.error("Error removing the workflow instance {} of event {} from the {} index.", workflowInstanceId,
2365 eventId, index.getIndexName(), e);
2366 }
2367 }
2368 }
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382 private void updateWorkflowInstanceInIndex(long id, int state, String wfDefId, String mpId, String orgId) {
2383 final User user = securityService.getUser();
2384
2385 logger.debug("Updating workflow instance {} of event {} in the {} index.", id, mpId,
2386 index.getIndexName());
2387 Function<Optional<Event>, Optional<Event>> updateFunction = getStateUpdateFunction(id, state, wfDefId, mpId, orgId);
2388
2389 try {
2390 index.addOrUpdateEvent(mpId, updateFunction, orgId, user);
2391 logger.debug("Workflow instance {} of event {} updated in the {} index.", id, mpId,
2392 index.getIndexName());
2393 } catch (SearchIndexException e) {
2394 logger.error("Error updating the workflow instance {} of event {} in the {} index.", id, mpId,
2395 index.getIndexName(), e);
2396 }
2397 }
2398
2399
2400
2401
2402
2403
2404 private Function<Optional<Event>, Optional<Event>> getStateUpdateFunction(long workflowId,
2405 int workflowState, String workflowDefinitionId, String mediaPackageId,
2406 String orgId) {
2407 return (Optional<Event> eventOpt) -> {
2408 Event event = eventOpt.orElse(new Event(mediaPackageId, orgId));
2409 event.setWorkflowId(workflowId);
2410 event.setWorkflowState(WorkflowState.values()[workflowState]);
2411 event.setWorkflowDefinitionId(workflowDefinitionId);
2412 return Optional.of(event);
2413 };
2414 }
2415 }