View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.api;
23  
24  import static java.lang.String.format;
25  
26  import org.opencastproject.job.api.Job;
27  import org.opencastproject.job.api.JobBarrier;
28  import org.opencastproject.job.api.JobContext;
29  import org.opencastproject.mediapackage.MediaPackage;
30  import org.opencastproject.mediapackage.MediaPackageElement;
31  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
32  import org.opencastproject.serviceregistry.api.ServiceRegistry;
33  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
34  
35  import org.apache.commons.io.FilenameUtils;
36  import org.apache.commons.lang3.StringUtils;
37  import org.osgi.framework.Constants;
38  import org.osgi.service.component.ComponentContext;
39  import org.osgi.service.component.annotations.Activate;
40  import org.osgi.service.component.annotations.Reference;
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  import java.util.ArrayList;
45  import java.util.List;
46  import java.util.Map;
47  import java.util.Optional;
48  
49  /**
50   * Abstract base implementation for an operation handler, which implements a simple start operation that returns a
51   * {@link WorkflowOperationResult} with the current mediapackage and {@link Action#CONTINUE}.
52   */
53  public abstract class AbstractWorkflowOperationHandler implements WorkflowOperationHandler {
54  
55    /** The logging facility */
56    private static final Logger logger = LoggerFactory.getLogger(AbstractWorkflowOperationHandler.class);
57  
58    /** The ID of this operation handler */
59    protected String id = null;
60  
61    /** The description of what this handler actually does */
62    protected String description = null;
63  
64    /** Optional service registry */
65    protected ServiceRegistry serviceRegistry = null;
66  
67    /** The JobBarrier polling interval */
68    private long jobBarrierPollingInterval = JobBarrier.DEFAULT_POLLING_INTERVAL;
69  
70    /** Config for Tag Parsing operation */
71    protected enum Configuration {
72      none, one, many
73    };
74  
75    public static final String TARGET_FLAVORS = "target-flavors";
76    public static final String TARGET_FLAVOR = "target-flavor";
77    public static final String TARGET_TAGS = "target-tags";
78    public static final String TARGET_TAG = "target-tag";
79    public static final String SOURCE_FLAVORS = "source-flavors";
80    public static final String SOURCE_FLAVOR = "source-flavor";
81    public static final String SOURCE_TAG = "source-tag";
82    public static final String SOURCE_TAGS = "source-tags";
83  
84    /**
85     * Activates this component with its properties once all of the collaborating services have been set
86     *
87     * @param cc
88     *          The component's context, containing the properties used for configuration
89     */
90    @Activate
91    protected void activate(ComponentContext cc) {
92      this.id = (String) cc.getProperties().get(WorkflowService.WORKFLOW_OPERATION_PROPERTY);
93      this.description = (String) cc.getProperties().get(Constants.SERVICE_DESCRIPTION);
94    }
95  
96    /**
97     * {@inheritDoc}
98     *
99     * @see org.opencastproject.workflow.api.WorkflowOperationHandler#start(
100    *      org.opencastproject.workflow.api.WorkflowInstance, JobContext)
101    */
102   @Override
103   public abstract WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
104           throws WorkflowOperationException;
105 
106   /**
107    * {@inheritDoc}
108    *
109    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#skip(
110    *      org.opencastproject.workflow.api.WorkflowInstance, JobContext)
111    */
112   @Override
113   public WorkflowOperationResult skip(WorkflowInstance workflowInstance, JobContext context)
114           throws WorkflowOperationException {
115     return createResult(Action.SKIP);
116   }
117 
118   /**
119    * {@inheritDoc}
120    *
121    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#destroy(
122    *      org.opencastproject.workflow.api.WorkflowInstance, JobContext)
123    */
124   @Override
125   public void destroy(WorkflowInstance workflowInstance, JobContext context) throws WorkflowOperationException {
126   }
127 
128   /**
129    * Converts a comma separated string into a set of values. Useful for converting operation configuration strings into
130    * multi-valued sets.
131    *
132    * @param elements
133    *          The comma space separated string
134    * @return the set of values
135    */
136   protected List<String> asList(String elements) {
137     elements = StringUtils.trimToEmpty(elements);
138     List<String> list = new ArrayList<>();
139     for (String s : StringUtils.split(elements, ",")) {
140       if (StringUtils.trimToNull(s) != null) {
141         list.add(s.trim());
142       }
143     }
144     return list;
145   }
146 
147   /**
148    * Generates a filename using the base name of a source element and the extension of a derived element.
149    *
150    * @param source
151    *          the source media package element
152    * @param derived
153    *          the derived media package element
154    * @return the filename
155    */
156   protected String getFileNameFromElements(MediaPackageElement source, MediaPackageElement derived) {
157     String fileName = FilenameUtils.getBaseName(source.getURI().getPath());
158     String fileExtension = FilenameUtils.getExtension(derived.getURI().getPath());
159     return fileName + "." + fileExtension;
160   }
161 
162   /**
163    * {@inheritDoc}
164    *
165    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#getId()
166    */
167   @Override
168   public String getId() {
169     return id;
170   }
171 
172   /**
173    * {@inheritDoc}
174    *
175    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#getDescription()
176    */
177   @Override
178   public String getDescription() {
179     return description;
180   }
181 
182   /**
183    * Creates a result for the execution of this workflow operation handler.
184    *
185    * @param action
186    *          the action to take
187    * @return the result
188    */
189   protected WorkflowOperationResult createResult(Action action) {
190     return createResult(null, null, action, 0);
191   }
192 
193   /**
194    * Creates a result for the execution of this workflow operation handler.
195    *
196    * @param mediaPackage
197    *          the modified mediapackage
198    * @param action
199    *          the action to take
200    * @return the result
201    */
202   protected WorkflowOperationResult createResult(MediaPackage mediaPackage, Action action) {
203     return createResult(mediaPackage, null, action, 0);
204   }
205 
206   /**
207    * Creates a result for the execution of this workflow operation handler.
208    * <p>
209    * Since there is no way for the workflow service to determine the queuing time (e. g. waiting on services), it needs
210    * to be provided by the handler.
211    *
212    * @param mediaPackage
213    *          the modified mediapackage
214    * @param action
215    *          the action to take
216    * @param timeInQueue
217    *          the amount of time this handle spent waiting for services
218    * @return the result
219    */
220   protected WorkflowOperationResult createResult(MediaPackage mediaPackage, Action action, long timeInQueue) {
221     return createResult(mediaPackage, null, action, timeInQueue);
222   }
223 
224   /**
225    * Creates a result for the execution of this workflow operation handler.
226    * <p>
227    * Since there is no way for the workflow service to determine the queuing time (e. g. waiting on services), it needs
228    * to be provided by the handler.
229    *
230    * @param mediaPackage
231    *          the modified mediapackage
232    * @param properties
233    *          the properties to add to the workflow instance
234    * @param action
235    *          the action to take
236    * @param timeInQueue
237    *          the amount of time this handle spent waiting for services
238    * @return the result
239    */
240   protected WorkflowOperationResult createResult(MediaPackage mediaPackage, Map<String, String> properties,
241           Action action, long timeInQueue) {
242     return new WorkflowOperationResultImpl(mediaPackage, properties, action, timeInQueue);
243   }
244 
245   /**
246    * Sets the service registry. This method is here as a convenience for developers that need the registry to do job
247    * waiting.
248    *
249    * @param serviceRegistry
250    *          the service registry
251    */
252   @Reference
253   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
254     this.serviceRegistry = serviceRegistry;
255   }
256 
257   /**
258    * Waits until all of the jobs have reached either one of these statuses:
259    * <ul>
260    * <li>{@link Job.Status#FINISHED}</li>
261    * <li>{@link Job.Status#FAILED}</li>
262    * <li>{@link Job.Status#DELETED}</li>
263    * </ul>
264    * After that, the method returns with the actual outcomes of the jobs.
265    *
266    * @param jobs
267    *          the jobs
268    * @return the jobs and their outcomes
269    * @throws IllegalStateException
270    *           if the service registry has not been set
271    * @throws IllegalArgumentException
272    *           if the jobs collecion is either <code>null</code> or empty
273    */
274   protected JobBarrier.Result waitForStatus(Job... jobs) throws IllegalStateException, IllegalArgumentException {
275     return waitForStatus(0, jobs);
276   }
277 
278   /**
279    * Waits until all of the jobs have reached either one of these statuses:
280    * <ul>
281    * <li>{@link Job.Status#FINISHED}</li>
282    * <li>{@link Job.Status#FAILED}</li>
283    * <li>{@link Job.Status#DELETED}</li>
284    * </ul>
285    * After that, the method returns with the actual outcomes of the jobs.
286    *
287    * @param timeout
288    *          the maximum amount of time in milliseconds to wait
289    * @param jobs
290    *          the jobs
291    * @return the jobs and their outcomes
292    * @throws IllegalStateException
293    *           if the service registry has not been set
294    * @throws IllegalArgumentException
295    *           if the jobs collection is either <code>null</code> or empty
296    */
297   protected JobBarrier.Result waitForStatus(long timeout, Job... jobs) throws IllegalStateException,
298           IllegalArgumentException {
299     if (serviceRegistry == null) {
300       throw new IllegalStateException("Can't wait for job status without providing a service registry first");
301     }
302     JobBarrier barrier = new JobBarrier(null, serviceRegistry, jobBarrierPollingInterval, jobs);
303     return barrier.waitForJobs(timeout);
304   }
305 
306   /**
307    * Get a configuration option.
308    *
309    * @deprecated use {@link #getConfig(WorkflowInstance, String)} or
310    *             {@link #getOptConfig(org.opencastproject.workflow.api.WorkflowInstance, String)}
311    */
312   protected Optional<String> getCfg(WorkflowInstance wi, String key) {
313     return Optional.ofNullable(wi.getCurrentOperation().getConfiguration(key));
314   }
315 
316   /**
317    * Get a mandatory configuration key. Values are returned trimmed.
318    *
319    * @throws WorkflowOperationException
320    *         if the configuration key is either missing or empty
321    */
322   protected String getConfig(WorkflowInstance wi, String key) throws WorkflowOperationException {
323     return getConfig(wi.getCurrentOperation(), key);
324   }
325 
326   /**
327    * Get a configuration key. Values are returned trimmed.
328    *
329    * @param w
330    *        WorkflowInstance with current operation
331    * @param key
332    *        Configuration key to check for
333    * @param defaultValue
334    *        Value to return if key does not exists
335    */
336   protected String getConfig(WorkflowInstance w, String key, String defaultValue) {
337     Optional<String> cfgOpt = getOptConfig(w.getCurrentOperation(), key);
338     if (cfgOpt.isPresent()) {
339       return cfgOpt.get();
340     }
341     return defaultValue;
342   }
343 
344   /**
345    * Get a mandatory configuration key. Values are returned trimmed.
346    *
347    * @throws WorkflowOperationException
348    *         if the configuration key is either missing or empty
349    */
350   protected String getConfig(WorkflowOperationInstance woi, String key) throws WorkflowOperationException {
351     Optional<String> cfgOpt = getOptConfig(woi, key);
352     if (cfgOpt.isPresent()) {
353       return cfgOpt.get();
354     }
355     throw new WorkflowOperationException(format("Configuration key '%s' is either missing or empty", key));
356   }
357 
358   /**
359    * Get an optional configuration key. Values are returned trimmed.
360    */
361   protected Optional<String> getOptConfig(WorkflowInstance wi, String key) {
362     return getOptConfig(wi.getCurrentOperation(), key);
363   }
364 
365   /**
366    * Get an optional configuration key. Values are returned trimmed.
367    */
368   protected Optional<String> getOptConfig(WorkflowOperationInstance woi, String key) {
369     return Optional.ofNullable(woi.getConfiguration(key))
370         .map(String::trim)
371         .filter(s -> !s.isEmpty());
372   }
373 
374   /**
375    * Returns a ConfiguredTagsAndFlavors instance, which includes all specified source/target tags and flavors if they
376    * are valid. Lists can be empty, if no values were specified! This is to enable WOHs to individually check if a
377    * given tag/flavor was set. This also means that you should use Configuration.many as parameter, if a tag/flavor is
378    * optional.
379    * @param srcTags none, one or many
380    * @param srcFlavors none, one or many
381    * @param targetFlavors none, one or many
382    * @param targetTags none, one or many
383    * @return ConfiguredTagsAndFlavors object including lists for the configured tags/flavors
384    */
385   protected ConfiguredTagsAndFlavors getTagsAndFlavors(WorkflowInstance workflow, Configuration srcTags,
386       Configuration srcFlavors, Configuration targetTags, Configuration targetFlavors)
387           throws WorkflowOperationException {
388     WorkflowOperationInstance operation = workflow.getCurrentOperation();
389     ConfiguredTagsAndFlavors tagsAndFlavors = new ConfiguredTagsAndFlavors();
390     MediaPackageElementFlavor flavor;
391 
392     List<String> srcTagList = new ArrayList<>();
393     String srcTag;
394     switch(srcTags) {
395       case none:
396         break;
397       case one:
398         srcTag = StringUtils.trimToNull(operation.getConfiguration(SOURCE_TAG));
399         if (srcTag == null) {
400           throw new WorkflowOperationException("Configuration key '" + SOURCE_TAG + "' must be set");
401         }
402         srcTagList.add(srcTag);
403         break;
404       case many:
405         srcTagList = asList(StringUtils.trimToNull(operation.getConfiguration(SOURCE_TAGS)));
406         srcTag = StringUtils.trimToNull(operation.getConfiguration(SOURCE_TAG));
407         if (srcTagList.isEmpty() && srcTag != null) {
408           srcTagList.add(srcTag);
409         }
410         break;
411       default:
412         throw new WorkflowOperationException("Couldn't process srcTags configuration option!");
413     }
414     tagsAndFlavors.setSrcTags(srcTagList);
415 
416     List<MediaPackageElementFlavor> srcFlavorList = new ArrayList<>();
417     String singleSourceFlavor;
418     switch(srcFlavors) {
419       case none:
420         break;
421       case one:
422         singleSourceFlavor = StringUtils.trimToNull(operation.getConfiguration(SOURCE_FLAVOR));
423         if (singleSourceFlavor == null) {
424           throw new WorkflowOperationException("Configuration key '" + SOURCE_FLAVOR + "' must be set");
425         }
426         try {
427           flavor = MediaPackageElementFlavor.parseFlavor(singleSourceFlavor);
428         } catch (IllegalArgumentException e) {
429           throw new WorkflowOperationException(singleSourceFlavor + " is not a valid flavor!");
430         }
431         srcFlavorList.add(flavor);
432         break;
433       case many:
434         List<String> srcFlavorString = asList(StringUtils.trimToNull(operation.getConfiguration(SOURCE_FLAVORS)));
435         singleSourceFlavor = StringUtils.trimToNull(operation.getConfiguration(SOURCE_FLAVOR));
436         if (srcFlavorString.isEmpty() && singleSourceFlavor != null) {
437           srcFlavorString.add(singleSourceFlavor);
438         }
439         for (String elem : srcFlavorString) {
440           try {
441             flavor = MediaPackageElementFlavor.parseFlavor(elem);
442             srcFlavorList.add(flavor);
443           } catch (IllegalArgumentException e) {
444             throw new WorkflowOperationException(elem + " is not a valid flavor!");
445           }
446         }
447         break;
448       default:
449         throw new WorkflowOperationException("Couldn't process srcFlavors configuration option!");
450     }
451     tagsAndFlavors.setSrcFlavors(srcFlavorList);
452 
453     ConfiguredTagsAndFlavors.TargetTags targetTagMap = new ConfiguredTagsAndFlavors.TargetTags();
454     String targetTag;
455     switch(targetTags) {
456       case none:
457         break;
458       case one:
459         targetTag = StringUtils.trimToNull(operation.getConfiguration(TARGET_TAG));
460         if (targetTag == null) {
461           throw new WorkflowOperationException("Configuration key '" + TARGET_TAG + "' must be set");
462         }
463         targetTagMap = parseTargetTagsByType(List.of(targetTag));
464         break;
465       case many:
466         List<String> targetTagList = asList(StringUtils.trimToNull(operation.getConfiguration(TARGET_TAGS)));
467         targetTagMap = parseTargetTagsByType(targetTagList);
468         targetTag = StringUtils.trimToNull(operation.getConfiguration(TARGET_TAG));
469         if (targetTagList.isEmpty() && targetTag != null) {
470           targetTagMap = parseTargetTagsByType(List.of(targetTag));
471         }
472         break;
473       default:
474         throw new WorkflowOperationException("Couldn't process target-tag configuration option!");
475     }
476     tagsAndFlavors.setTargetTags(targetTagMap);
477 
478     List<MediaPackageElementFlavor> targetFlavorList = new ArrayList<>();
479     String singleTargetFlavor;
480     switch(targetFlavors) {
481       case none:
482         break;
483       case one:
484         singleTargetFlavor = StringUtils.trimToNull(operation.getConfiguration(TARGET_FLAVOR));
485         if (singleTargetFlavor == null) {
486           throw new WorkflowOperationException("Configuration key '" + TARGET_FLAVOR + "' must be set");
487         }
488         try {
489           flavor = MediaPackageElementFlavor.parseFlavor(singleTargetFlavor);
490         } catch (IllegalArgumentException e) {
491           throw new WorkflowOperationException(singleTargetFlavor + " is not a valid flavor!");
492         }
493         targetFlavorList.add(flavor);
494         break;
495       case many:
496         List<String> targetFlavorString = asList(StringUtils.trimToNull(operation.getConfiguration(TARGET_FLAVORS)));
497         singleTargetFlavor = StringUtils.trimToNull(operation.getConfiguration(TARGET_FLAVOR));
498         if (targetFlavorString.isEmpty() && singleTargetFlavor != null) {
499           targetFlavorString.add(singleTargetFlavor);
500         }
501         for (String elem : targetFlavorString) {
502           try {
503             flavor = MediaPackageElementFlavor.parseFlavor(elem);
504           } catch (IllegalArgumentException e) {
505             throw new WorkflowOperationException(elem + " is not a valid flavor!");
506           }
507           targetFlavorList.add(flavor);
508         }
509         break;
510       default:
511         throw new WorkflowOperationException("Couldn't process targetFlavors configuration option!");
512     }
513     tagsAndFlavors.setTargetFlavors(targetFlavorList);
514     return tagsAndFlavors;
515   }
516 
517   private ConfiguredTagsAndFlavors.TargetTags parseTargetTagsByType(List<String> tags) {
518     final String plus = "+";
519     final String minus = "-";
520     List<String> overrideTags = new ArrayList();
521     List<String> addTags = new ArrayList();
522     List<String> removeTags = new ArrayList();
523 
524     for (String targetTag : tags) {
525       if (!StringUtils.startsWithAny(targetTag, plus, minus)) {
526         if (addTags.size() > 0
527             || removeTags.size() > 0) {
528           logger.warn("You may not mix override tags and tag changes. "
529               + "The list of override tags so far is {}. "
530               + "The tag {} is not prefixed with '{}' or '{}'.", overrideTags, targetTag, plus, minus);
531         }
532         overrideTags.add(targetTag);
533       } else if (StringUtils.startsWith(targetTag, plus)) {
534         addTags.add(StringUtils.substring(targetTag, 1));
535       } else if (StringUtils.startsWith(targetTag, minus)) {
536         removeTags.add(StringUtils.substring(targetTag, 1));
537       }
538     }
539 
540     return new ConfiguredTagsAndFlavors.TargetTags(overrideTags, addTags, removeTags);
541   }
542 
543   /**
544    * Helper function that applies target tags to the given element, based on the type(s) of the tag(s)
545    * @param targetTags The target tags to apply to the element
546    * @param element The element the target tags are applied to
547    * @return The element with the applied target tags
548    */
549   protected <T extends MediaPackageElement> T applyTargetTagsToElement(
550       ConfiguredTagsAndFlavors.TargetTags targetTags,
551       T element
552   ) {
553     // set tags on target element
554     List<String> overrideTags = targetTags.getOverrideTags();
555     List<String> addTags = targetTags.getAddTags();
556     List<String> removeTags = targetTags.getRemoveTags();
557     if (overrideTags.size() > 0) {
558       element.clearTags();
559       for (String tag : overrideTags) {
560         element.addTag(tag);
561       }
562     } else {
563       for (String tag : removeTags) {
564         element.removeTag(tag);
565       }
566       for (String tag : addTags) {
567         element.addTag(tag);
568       }
569     }
570 
571     return element;
572   }
573 
574   /**
575    * Set the @link org.opencastproject.job.api.JobBarrier polling interval.
576    * <p>
577    * While waiting for other jobs to finish, the barrier will poll the status of these jobs until they are finished. To
578    * reduce load on the system, the polling is done only every x milliseconds. This interval defines the sleep time
579    * between these polls.
580    * <p>
581    * If most cases you want to leave this at its default value. It will make sense, though, to adjust this time if you
582    * know that your job will be exceptionally short. An example of this might be the unit tests where other jobs are
583    * usually mocked. But this setting is not limited to tests and may be a sensible options for other jobs as well.
584    *
585    * @param interval the time in miliseconds between two polling operations
586    *
587    * @see org.opencastproject.job.api.JobBarrier#DEFAULT_POLLING_INTERVAL
588    */
589   public void setJobBarrierPollingInterval(long interval) {
590     this.jobBarrierPollingInterval = interval;
591   }
592 
593   /**
594    * {@inheritDoc}
595    *
596    * @see java.lang.Object#hashCode()
597    */
598   @Override
599   public int hashCode() {
600     return id != null ? id.hashCode() : super.hashCode();
601   }
602 
603   /**
604    * {@inheritDoc}
605    *
606    * @see java.lang.Object#equals(java.lang.Object)
607    */
608   @Override
609   public boolean equals(Object obj) {
610     if (obj instanceof WorkflowOperationHandler) {
611       if (id != null) {
612         return id.equals(((WorkflowOperationHandler) obj).getId());
613       } else {
614         return this == obj;
615       }
616     }
617     return false;
618   }
619 
620   /**
621    * {@inheritDoc}
622    *
623    * @see java.lang.Object#toString()
624    */
625   @Override
626   public String toString() {
627     return getId();
628   }
629 }