View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.api;
23  
24  import static java.lang.String.format;
25  
26  import org.opencastproject.job.api.Job;
27  import org.opencastproject.job.api.JobBarrier;
28  import org.opencastproject.job.api.JobContext;
29  import org.opencastproject.mediapackage.MediaPackage;
30  import org.opencastproject.mediapackage.MediaPackageElement;
31  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
32  import org.opencastproject.serviceregistry.api.ServiceRegistry;
33  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
34  
35  import org.apache.commons.io.FilenameUtils;
36  import org.apache.commons.lang3.StringUtils;
37  import org.osgi.framework.Constants;
38  import org.osgi.service.component.ComponentContext;
39  import org.osgi.service.component.annotations.Activate;
40  import org.osgi.service.component.annotations.Reference;
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  import java.util.ArrayList;
45  import java.util.List;
46  import java.util.Map;
47  import java.util.Optional;
48  
49  /**
50   * Abstract base implementation for an operation handler, which implements a simple start operation that returns a
51   * {@link WorkflowOperationResult} with the current mediapackage and {@link Action#CONTINUE}.
52   */
53  public abstract class AbstractWorkflowOperationHandler implements WorkflowOperationHandler {
54  
55    /** The logging facility */
56    private static final Logger logger = LoggerFactory.getLogger(AbstractWorkflowOperationHandler.class);
57  
58    /** The ID of this operation handler */
59    protected String id = null;
60  
61    /** The description of what this handler actually does */
62    protected String description = null;
63  
64    /** Optional service registry */
65    protected ServiceRegistry serviceRegistry = null;
66  
67    /** The JobBarrier polling interval */
68    private long jobBarrierPollingInterval = JobBarrier.DEFAULT_POLLING_INTERVAL;
69  
70    /** Config for Tag Parsing operation */
71    protected enum Configuration { none, one, many };
72  
73    public static final String TARGET_FLAVORS = "target-flavors";
74    public static final String TARGET_FLAVOR = "target-flavor";
75    public static final String TARGET_TAGS = "target-tags";
76    public static final String TARGET_TAG = "target-tag";
77    public static final String SOURCE_FLAVORS = "source-flavors";
78    public static final String SOURCE_FLAVOR = "source-flavor";
79    public static final String SOURCE_TAG = "source-tag";
80    public static final String SOURCE_TAGS = "source-tags";
81  
82    /**
83     * Activates this component with its properties once all of the collaborating services have been set
84     *
85     * @param cc
86     *          The component's context, containing the properties used for configuration
87     */
88    @Activate
89    protected void activate(ComponentContext cc) {
90      this.id = (String) cc.getProperties().get(WorkflowService.WORKFLOW_OPERATION_PROPERTY);
91      this.description = (String) cc.getProperties().get(Constants.SERVICE_DESCRIPTION);
92    }
93  
94    /**
95     * {@inheritDoc}
96     *
97     * @see org.opencastproject.workflow.api.WorkflowOperationHandler#start(org.opencastproject.workflow.api.WorkflowInstance,
98     *      JobContext)
99     */
100   @Override
101   public abstract WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
102           throws WorkflowOperationException;
103 
104   /**
105    * {@inheritDoc}
106    *
107    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#skip(org.opencastproject.workflow.api.WorkflowInstance,
108    *      JobContext)
109    */
110   @Override
111   public WorkflowOperationResult skip(WorkflowInstance workflowInstance, JobContext context)
112           throws WorkflowOperationException {
113     return createResult(Action.SKIP);
114   }
115 
116   /**
117    * {@inheritDoc}
118    *
119    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#destroy(org.opencastproject.workflow.api.WorkflowInstance,
120    *      JobContext)
121    */
122   @Override
123   public void destroy(WorkflowInstance workflowInstance, JobContext context) throws WorkflowOperationException {
124   }
125 
126   /**
127    * Converts a comma separated string into a set of values. Useful for converting operation configuration strings into
128    * multi-valued sets.
129    *
130    * @param elements
131    *          The comma space separated string
132    * @return the set of values
133    */
134   protected List<String> asList(String elements) {
135     elements = StringUtils.trimToEmpty(elements);
136     List<String> list = new ArrayList<>();
137     for (String s : StringUtils.split(elements, ",")) {
138       if (StringUtils.trimToNull(s) != null) {
139         list.add(s.trim());
140       }
141     }
142     return list;
143   }
144 
145   /**
146    * Generates a filename using the base name of a source element and the extension of a derived element.
147    *
148    * @param source
149    *          the source media package element
150    * @param derived
151    *          the derived media package element
152    * @return the filename
153    */
154   protected String getFileNameFromElements(MediaPackageElement source, MediaPackageElement derived) {
155     String fileName = FilenameUtils.getBaseName(source.getURI().getPath());
156     String fileExtension = FilenameUtils.getExtension(derived.getURI().getPath());
157     return fileName + "." + fileExtension;
158   }
159 
160   /**
161    * {@inheritDoc}
162    *
163    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#getId()
164    */
165   @Override
166   public String getId() {
167     return id;
168   }
169 
170   /**
171    * {@inheritDoc}
172    *
173    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#getDescription()
174    */
175   @Override
176   public String getDescription() {
177     return description;
178   }
179 
180   /**
181    * Creates a result for the execution of this workflow operation handler.
182    *
183    * @param action
184    *          the action to take
185    * @return the result
186    */
187   protected WorkflowOperationResult createResult(Action action) {
188     return createResult(null, null, action, 0);
189   }
190 
191   /**
192    * Creates a result for the execution of this workflow operation handler.
193    *
194    * @param mediaPackage
195    *          the modified mediapackage
196    * @param action
197    *          the action to take
198    * @return the result
199    */
200   protected WorkflowOperationResult createResult(MediaPackage mediaPackage, Action action) {
201     return createResult(mediaPackage, null, action, 0);
202   }
203 
204   /**
205    * Creates a result for the execution of this workflow operation handler.
206    * <p>
207    * Since there is no way for the workflow service to determine the queuing time (e. g. waiting on services), it needs
208    * to be provided by the handler.
209    *
210    * @param mediaPackage
211    *          the modified mediapackage
212    * @param action
213    *          the action to take
214    * @param timeInQueue
215    *          the amount of time this handle spent waiting for services
216    * @return the result
217    */
218   protected WorkflowOperationResult createResult(MediaPackage mediaPackage, Action action, long timeInQueue) {
219     return createResult(mediaPackage, null, action, timeInQueue);
220   }
221 
222   /**
223    * Creates a result for the execution of this workflow operation handler.
224    * <p>
225    * Since there is no way for the workflow service to determine the queuing time (e. g. waiting on services), it needs
226    * to be provided by the handler.
227    *
228    * @param mediaPackage
229    *          the modified mediapackage
230    * @param properties
231    *          the properties to add to the workflow instance
232    * @param action
233    *          the action to take
234    * @param timeInQueue
235    *          the amount of time this handle spent waiting for services
236    * @return the result
237    */
238   protected WorkflowOperationResult createResult(MediaPackage mediaPackage, Map<String, String> properties,
239           Action action, long timeInQueue) {
240     return new WorkflowOperationResultImpl(mediaPackage, properties, action, timeInQueue);
241   }
242 
243   /**
244    * Sets the service registry. This method is here as a convenience for developers that need the registry to do job
245    * waiting.
246    *
247    * @param serviceRegistry
248    *          the service registry
249    */
250   @Reference
251   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
252     this.serviceRegistry = serviceRegistry;
253   }
254 
255   /**
256    * Waits until all of the jobs have reached either one of these statuses:
257    * <ul>
258    * <li>{@link Job.Status#FINISHED}</li>
259    * <li>{@link Job.Status#FAILED}</li>
260    * <li>{@link Job.Status#DELETED}</li>
261    * </ul>
262    * After that, the method returns with the actual outcomes of the jobs.
263    *
264    * @param jobs
265    *          the jobs
266    * @return the jobs and their outcomes
267    * @throws IllegalStateException
268    *           if the service registry has not been set
269    * @throws IllegalArgumentException
270    *           if the jobs collecion is either <code>null</code> or empty
271    */
272   protected JobBarrier.Result waitForStatus(Job... jobs) throws IllegalStateException, IllegalArgumentException {
273     return waitForStatus(0, jobs);
274   }
275 
276   /**
277    * Waits until all of the jobs have reached either one of these statuses:
278    * <ul>
279    * <li>{@link Job.Status#FINISHED}</li>
280    * <li>{@link Job.Status#FAILED}</li>
281    * <li>{@link Job.Status#DELETED}</li>
282    * </ul>
283    * After that, the method returns with the actual outcomes of the jobs.
284    *
285    * @param timeout
286    *          the maximum amount of time in milliseconds to wait
287    * @param jobs
288    *          the jobs
289    * @return the jobs and their outcomes
290    * @throws IllegalStateException
291    *           if the service registry has not been set
292    * @throws IllegalArgumentException
293    *           if the jobs collection is either <code>null</code> or empty
294    */
295   protected JobBarrier.Result waitForStatus(long timeout, Job... jobs) throws IllegalStateException,
296           IllegalArgumentException {
297     if (serviceRegistry == null) {
298       throw new IllegalStateException("Can't wait for job status without providing a service registry first");
299     }
300     JobBarrier barrier = new JobBarrier(null, serviceRegistry, jobBarrierPollingInterval, jobs);
301     return barrier.waitForJobs(timeout);
302   }
303 
304   /**
305    * Get a configuration option.
306    *
307    * @deprecated use {@link #getConfig(WorkflowInstance, String)} or {@link #getOptConfig(org.opencastproject.workflow.api.WorkflowInstance, String)}
308    */
309   protected Optional<String> getCfg(WorkflowInstance wi, String key) {
310     return Optional.ofNullable(wi.getCurrentOperation().getConfiguration(key));
311   }
312 
313   /**
314    * Get a mandatory configuration key. Values are returned trimmed.
315    *
316    * @throws WorkflowOperationException
317    *         if the configuration key is either missing or empty
318    */
319   protected String getConfig(WorkflowInstance wi, String key) throws WorkflowOperationException {
320     return getConfig(wi.getCurrentOperation(), key);
321   }
322 
323   /**
324    * Get a configuration key. Values are returned trimmed.
325    *
326    * @param w
327    *        WorkflowInstance with current operation
328    * @param key
329    *        Configuration key to check for
330    * @param defaultValue
331    *        Value to return if key does not exists
332    */
333   protected String getConfig(WorkflowInstance w, String key, String defaultValue) {
334     Optional<String> cfgOpt = getOptConfig(w.getCurrentOperation(), key);
335     if (cfgOpt.isPresent()) {
336       return cfgOpt.get();
337     }
338     return defaultValue;
339   }
340 
341   /**
342    * Get a mandatory configuration key. Values are returned trimmed.
343    *
344    * @throws WorkflowOperationException
345    *         if the configuration key is either missing or empty
346    */
347   protected String getConfig(WorkflowOperationInstance woi, String key) throws WorkflowOperationException {
348     Optional<String> cfgOpt = getOptConfig(woi, key);
349     if (cfgOpt.isPresent()) {
350       return cfgOpt.get();
351     }
352     throw new WorkflowOperationException(format("Configuration key '%s' is either missing or empty", key));
353   }
354 
355   /**
356    * Get an optional configuration key. Values are returned trimmed.
357    */
358   protected Optional<String> getOptConfig(WorkflowInstance wi, String key) {
359     return getOptConfig(wi.getCurrentOperation(), key);
360   }
361 
362   /**
363    * Get an optional configuration key. Values are returned trimmed.
364    */
365   protected Optional<String> getOptConfig(WorkflowOperationInstance woi, String key) {
366     return Optional.ofNullable(woi.getConfiguration(key))
367         .map(String::trim)
368         .filter(s -> !s.isEmpty());
369   }
370 
371   /**
372    * Returns a ConfiguredTagsAndFlavors instance, which includes all specified source/target tags and flavors if they are valid
373    * Lists can be empty, if no values were specified! This is to enable WOHs to individually check if a given tag/flavor was set.
374    * This also means that you should use Configuration.many as parameter, if a tag/flavor is optional.
375    * @param srcTags none, one or many
376    * @param srcFlavors none, one or many
377    * @param targetFlavors none, one or many
378    * @param targetTags none, one or many
379    * @return ConfiguredTagsAndFlavors object including lists for the configured tags/flavors
380    */
381   protected ConfiguredTagsAndFlavors getTagsAndFlavors(WorkflowInstance workflow, Configuration srcTags, Configuration srcFlavors, Configuration targetTags, Configuration targetFlavors) throws WorkflowOperationException {
382     WorkflowOperationInstance operation = workflow.getCurrentOperation();
383     ConfiguredTagsAndFlavors tagsAndFlavors = new ConfiguredTagsAndFlavors();
384     MediaPackageElementFlavor flavor;
385 
386     List<String> srcTagList = new ArrayList<>();
387     String srcTag;
388     switch(srcTags) {
389       case none:
390         break;
391       case one:
392         srcTag = StringUtils.trimToNull(operation.getConfiguration(SOURCE_TAG));
393         if (srcTag == null) {
394           throw new WorkflowOperationException("Configuration key '" + SOURCE_TAG + "' must be set");
395         }
396         srcTagList.add(srcTag);
397         break;
398       case many:
399         srcTagList = asList(StringUtils.trimToNull(operation.getConfiguration(SOURCE_TAGS)));
400         srcTag = StringUtils.trimToNull(operation.getConfiguration(SOURCE_TAG));
401         if (srcTagList.isEmpty() && srcTag != null) {
402           srcTagList.add(srcTag);
403         }
404         break;
405       default:
406         throw new WorkflowOperationException("Couldn't process srcTags configuration option!");
407     }
408     tagsAndFlavors.setSrcTags(srcTagList);
409 
410     List<MediaPackageElementFlavor> srcFlavorList = new ArrayList<>();
411     String singleSourceFlavor;
412     switch(srcFlavors) {
413       case none:
414         break;
415       case one:
416         singleSourceFlavor = StringUtils.trimToNull(operation.getConfiguration(SOURCE_FLAVOR));
417         if (singleSourceFlavor == null) {
418           throw new WorkflowOperationException("Configuration key '" + SOURCE_FLAVOR + "' must be set");
419         }
420         try {
421           flavor = MediaPackageElementFlavor.parseFlavor(singleSourceFlavor);
422         } catch (IllegalArgumentException e) {
423           throw new WorkflowOperationException(singleSourceFlavor + " is not a valid flavor!");
424         }
425         srcFlavorList.add(flavor);
426         break;
427       case many:
428         List<String> srcFlavorString = asList(StringUtils.trimToNull(operation.getConfiguration(SOURCE_FLAVORS)));
429         singleSourceFlavor = StringUtils.trimToNull(operation.getConfiguration(SOURCE_FLAVOR));
430         if (srcFlavorString.isEmpty() && singleSourceFlavor != null) {
431           srcFlavorString.add(singleSourceFlavor);
432         }
433         for (String elem : srcFlavorString) {
434           try {
435             flavor = MediaPackageElementFlavor.parseFlavor(elem);
436             srcFlavorList.add(flavor);
437           } catch (IllegalArgumentException e) {
438             throw new WorkflowOperationException(elem + " is not a valid flavor!");
439           }
440         }
441         break;
442       default:
443         throw new WorkflowOperationException("Couldn't process srcFlavors configuration option!");
444     }
445     tagsAndFlavors.setSrcFlavors(srcFlavorList);
446 
447     ConfiguredTagsAndFlavors.TargetTags targetTagMap = new ConfiguredTagsAndFlavors.TargetTags();
448     String targetTag;
449     switch(targetTags) {
450       case none:
451         break;
452       case one:
453         targetTag = StringUtils.trimToNull(operation.getConfiguration(TARGET_TAG));
454         if (targetTag == null) {
455           throw new WorkflowOperationException("Configuration key '" + TARGET_TAG + "' must be set");
456         }
457         targetTagMap = parseTargetTagsByType(List.of(targetTag));
458         break;
459       case many:
460         List<String> targetTagList = asList(StringUtils.trimToNull(operation.getConfiguration(TARGET_TAGS)));
461         targetTagMap = parseTargetTagsByType(targetTagList);
462         targetTag = StringUtils.trimToNull(operation.getConfiguration(TARGET_TAG));
463         if (targetTagList.isEmpty() && targetTag != null) {
464           targetTagMap = parseTargetTagsByType(List.of(targetTag));
465         }
466         break;
467       default:
468         throw new WorkflowOperationException("Couldn't process target-tag configuration option!");
469     }
470     tagsAndFlavors.setTargetTags(targetTagMap);
471 
472     List<MediaPackageElementFlavor> targetFlavorList = new ArrayList<>();
473     String singleTargetFlavor;
474     switch(targetFlavors) {
475       case none:
476         break;
477       case one:
478         singleTargetFlavor = StringUtils.trimToNull(operation.getConfiguration(TARGET_FLAVOR));
479         if (singleTargetFlavor == null) {
480           throw new WorkflowOperationException("Configuration key '" + TARGET_FLAVOR + "' must be set");
481         }
482         try {
483           flavor = MediaPackageElementFlavor.parseFlavor(singleTargetFlavor);
484         } catch (IllegalArgumentException e) {
485           throw new WorkflowOperationException(singleTargetFlavor + " is not a valid flavor!");
486         }
487         targetFlavorList.add(flavor);
488         break;
489       case many:
490         List<String> targetFlavorString = asList(StringUtils.trimToNull(operation.getConfiguration(TARGET_FLAVORS)));
491         singleTargetFlavor = StringUtils.trimToNull(operation.getConfiguration(TARGET_FLAVOR));
492         if (targetFlavorString.isEmpty() && singleTargetFlavor != null) {
493           targetFlavorString.add(singleTargetFlavor);
494         }
495         for (String elem : targetFlavorString) {
496           try {
497             flavor = MediaPackageElementFlavor.parseFlavor(elem);
498           } catch (IllegalArgumentException e) {
499             throw new WorkflowOperationException(elem + " is not a valid flavor!");
500           }
501           targetFlavorList.add(flavor);
502         }
503         break;
504       default:
505         throw new WorkflowOperationException("Couldn't process targetFlavors configuration option!");
506     }
507     tagsAndFlavors.setTargetFlavors(targetFlavorList);
508     return tagsAndFlavors;
509   }
510 
511   private ConfiguredTagsAndFlavors.TargetTags parseTargetTagsByType(List<String> tags) {
512     final String plus = "+";
513     final String minus = "-";
514     List<String> overrideTags = new ArrayList();
515     List<String> addTags = new ArrayList();
516     List<String> removeTags = new ArrayList();
517 
518     for (String targetTag : tags) {
519       if (!StringUtils.startsWithAny(targetTag, plus, minus)) {
520         if (addTags.size() > 0
521             || removeTags.size() > 0) {
522           logger.warn("You may not mix override tags and tag changes. "
523               + "The list of override tags so far is {}. "
524               + "The tag {} is not prefixed with '{}' or '{}'.", overrideTags, targetTag, plus, minus);
525         }
526         overrideTags.add(targetTag);
527       } else if (StringUtils.startsWith(targetTag, plus)) {
528         addTags.add(StringUtils.substring(targetTag, 1));
529       } else if (StringUtils.startsWith(targetTag, minus)) {
530         removeTags.add(StringUtils.substring(targetTag, 1));
531       }
532     }
533 
534     return new ConfiguredTagsAndFlavors.TargetTags(overrideTags, addTags, removeTags);
535   }
536 
537   /**
538    * Helper function that applies target tags to the given element, based on the type(s) of the tag(s)
539    * @param targetTags The target tags to apply to the element
540    * @param element The element the target tags are applied to
541    * @return The element with the applied target tags
542    */
543   protected <T extends MediaPackageElement> T applyTargetTagsToElement(
544       ConfiguredTagsAndFlavors.TargetTags targetTags,
545       T element
546   ) {
547     // set tags on target element
548     List<String> overrideTags = targetTags.getOverrideTags();
549     List<String> addTags = targetTags.getAddTags();
550     List<String> removeTags = targetTags.getRemoveTags();
551     if (overrideTags.size() > 0) {
552       element.clearTags();
553       for (String tag : overrideTags) {
554         element.addTag(tag);
555       }
556     } else {
557       for (String tag : removeTags) {
558         element.removeTag(tag);
559       }
560       for (String tag : addTags) {
561         element.addTag(tag);
562       }
563     }
564 
565     return element;
566   }
567 
568   /**
569    * Set the @link org.opencastproject.job.api.JobBarrier polling interval.
570    * <p>
571    * While waiting for other jobs to finish, the barrier will poll the status of these jobs until they are finished. To
572    * reduce load on the system, the polling is done only every x milliseconds. This interval defines the sleep time
573    * between these polls.
574    * <p>
575    * If most cases you want to leave this at its default value. It will make sense, though, to adjust this time if you
576    * know that your job will be exceptionally short. An example of this might be the unit tests where other jobs are
577    * usually mocked. But this setting is not limited to tests and may be a sensible options for other jobs as well.
578    *
579    * @param interval the time in miliseconds between two polling operations
580    *
581    * @see org.opencastproject.job.api.JobBarrier#DEFAULT_POLLING_INTERVAL
582    */
583   public void setJobBarrierPollingInterval(long interval) {
584     this.jobBarrierPollingInterval = interval;
585   }
586 
587   /**
588    * {@inheritDoc}
589    *
590    * @see java.lang.Object#hashCode()
591    */
592   @Override
593   public int hashCode() {
594     return id != null ? id.hashCode() : super.hashCode();
595   }
596 
597   /**
598    * {@inheritDoc}
599    *
600    * @see java.lang.Object#equals(java.lang.Object)
601    */
602   @Override
603   public boolean equals(Object obj) {
604     if (obj instanceof WorkflowOperationHandler) {
605       if (id != null)
606         return id.equals(((WorkflowOperationHandler) obj).getId());
607       else
608         return this == obj;
609     }
610     return false;
611   }
612 
613   /**
614    * {@inheritDoc}
615    *
616    * @see java.lang.Object#toString()
617    */
618   @Override
619   public String toString() {
620     return getId();
621   }
622 }