View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.api;
23  
24  import static java.lang.String.format;
25  import static org.opencastproject.util.data.Option.option;
26  import static org.opencastproject.util.data.functions.Misc.chuck;
27  
28  import org.opencastproject.job.api.Job;
29  import org.opencastproject.job.api.JobBarrier;
30  import org.opencastproject.job.api.JobContext;
31  import org.opencastproject.mediapackage.MediaPackage;
32  import org.opencastproject.mediapackage.MediaPackageElement;
33  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
34  import org.opencastproject.serviceregistry.api.ServiceRegistry;
35  import org.opencastproject.util.data.Function;
36  import org.opencastproject.util.data.Function0;
37  import org.opencastproject.util.data.Option;
38  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
39  
40  import com.entwinemedia.fn.data.Opt;
41  import com.entwinemedia.fn.fns.Strings;
42  
43  import org.apache.commons.io.FilenameUtils;
44  import org.apache.commons.lang3.StringUtils;
45  import org.osgi.framework.Constants;
46  import org.osgi.service.component.ComponentContext;
47  import org.osgi.service.component.annotations.Activate;
48  import org.osgi.service.component.annotations.Reference;
49  import org.slf4j.Logger;
50  import org.slf4j.LoggerFactory;
51  
52  import java.util.ArrayList;
53  import java.util.List;
54  import java.util.Map;
55  
56  /**
57   * Abstract base implementation for an operation handler, which implements a simple start operation that returns a
58   * {@link WorkflowOperationResult} with the current mediapackage and {@link Action#CONTINUE}.
59   */
60  public abstract class AbstractWorkflowOperationHandler implements WorkflowOperationHandler {
61  
62    /** The logging facility */
63    private static final Logger logger = LoggerFactory.getLogger(AbstractWorkflowOperationHandler.class);
64  
65    /** The ID of this operation handler */
66    protected String id = null;
67  
68    /** The description of what this handler actually does */
69    protected String description = null;
70  
71    /** Optional service registry */
72    protected ServiceRegistry serviceRegistry = null;
73  
74    /** The JobBarrier polling interval */
75    private long jobBarrierPollingInterval = JobBarrier.DEFAULT_POLLING_INTERVAL;
76  
77    /** Config for Tag Parsing operation */
78    protected enum Configuration { none, one, many };
79  
80    public static final String TARGET_FLAVORS = "target-flavors";
81    public static final String TARGET_FLAVOR = "target-flavor";
82    public static final String TARGET_TAGS = "target-tags";
83    public static final String TARGET_TAG = "target-tag";
84    public static final String SOURCE_FLAVORS = "source-flavors";
85    public static final String SOURCE_FLAVOR = "source-flavor";
86    public static final String SOURCE_TAG = "source-tag";
87    public static final String SOURCE_TAGS = "source-tags";
88  
89    /**
90     * Activates this component with its properties once all of the collaborating services have been set
91     *
92     * @param cc
93     *          The component's context, containing the properties used for configuration
94     */
95    @Activate
96    protected void activate(ComponentContext cc) {
97      this.id = (String) cc.getProperties().get(WorkflowService.WORKFLOW_OPERATION_PROPERTY);
98      this.description = (String) cc.getProperties().get(Constants.SERVICE_DESCRIPTION);
99    }
100 
101   /**
102    * {@inheritDoc}
103    *
104    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#start(org.opencastproject.workflow.api.WorkflowInstance,
105    *      JobContext)
106    */
107   @Override
108   public abstract WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
109           throws WorkflowOperationException;
110 
111   /**
112    * {@inheritDoc}
113    *
114    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#skip(org.opencastproject.workflow.api.WorkflowInstance,
115    *      JobContext)
116    */
117   @Override
118   public WorkflowOperationResult skip(WorkflowInstance workflowInstance, JobContext context)
119           throws WorkflowOperationException {
120     return createResult(Action.SKIP);
121   }
122 
123   /**
124    * {@inheritDoc}
125    *
126    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#destroy(org.opencastproject.workflow.api.WorkflowInstance,
127    *      JobContext)
128    */
129   @Override
130   public void destroy(WorkflowInstance workflowInstance, JobContext context) throws WorkflowOperationException {
131   }
132 
133   /**
134    * Converts a comma separated string into a set of values. Useful for converting operation configuration strings into
135    * multi-valued sets.
136    *
137    * @param elements
138    *          The comma space separated string
139    * @return the set of values
140    */
141   protected List<String> asList(String elements) {
142     elements = StringUtils.trimToEmpty(elements);
143     List<String> list = new ArrayList<>();
144     for (String s : StringUtils.split(elements, ",")) {
145       if (StringUtils.trimToNull(s) != null) {
146         list.add(s.trim());
147       }
148     }
149     return list;
150   }
151 
152   /** {@link #asList(String)} as a function. */
153   protected Function<String, List<String>> asList = new Function<String, List<String>>() {
154     @Override public List<String> apply(String s) {
155       return asList(s);
156     }
157   };
158 
159   /**
160    * Generates a filename using the base name of a source element and the extension of a derived element.
161    *
162    * @param source
163    *          the source media package element
164    * @param derived
165    *          the derived media package element
166    * @return the filename
167    */
168   protected String getFileNameFromElements(MediaPackageElement source, MediaPackageElement derived) {
169     String fileName = FilenameUtils.getBaseName(source.getURI().getPath());
170     String fileExtension = FilenameUtils.getExtension(derived.getURI().getPath());
171     return fileName + "." + fileExtension;
172   }
173 
174   /**
175    * {@inheritDoc}
176    *
177    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#getId()
178    */
179   @Override
180   public String getId() {
181     return id;
182   }
183 
184   /**
185    * {@inheritDoc}
186    *
187    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#getDescription()
188    */
189   @Override
190   public String getDescription() {
191     return description;
192   }
193 
194   /**
195    * Creates a result for the execution of this workflow operation handler.
196    *
197    * @param action
198    *          the action to take
199    * @return the result
200    */
201   protected WorkflowOperationResult createResult(Action action) {
202     return createResult(null, null, action, 0);
203   }
204 
205   /**
206    * Creates a result for the execution of this workflow operation handler.
207    *
208    * @param mediaPackage
209    *          the modified mediapackage
210    * @param action
211    *          the action to take
212    * @return the result
213    */
214   protected WorkflowOperationResult createResult(MediaPackage mediaPackage, Action action) {
215     return createResult(mediaPackage, null, action, 0);
216   }
217 
218   /**
219    * Creates a result for the execution of this workflow operation handler.
220    * <p>
221    * Since there is no way for the workflow service to determine the queuing time (e. g. waiting on services), it needs
222    * to be provided by the handler.
223    *
224    * @param mediaPackage
225    *          the modified mediapackage
226    * @param action
227    *          the action to take
228    * @param timeInQueue
229    *          the amount of time this handle spent waiting for services
230    * @return the result
231    */
232   protected WorkflowOperationResult createResult(MediaPackage mediaPackage, Action action, long timeInQueue) {
233     return createResult(mediaPackage, null, action, timeInQueue);
234   }
235 
236   /**
237    * Creates a result for the execution of this workflow operation handler.
238    * <p>
239    * Since there is no way for the workflow service to determine the queuing time (e. g. waiting on services), it needs
240    * to be provided by the handler.
241    *
242    * @param mediaPackage
243    *          the modified mediapackage
244    * @param properties
245    *          the properties to add to the workflow instance
246    * @param action
247    *          the action to take
248    * @param timeInQueue
249    *          the amount of time this handle spent waiting for services
250    * @return the result
251    */
252   protected WorkflowOperationResult createResult(MediaPackage mediaPackage, Map<String, String> properties,
253           Action action, long timeInQueue) {
254     return new WorkflowOperationResultImpl(mediaPackage, properties, action, timeInQueue);
255   }
256 
257   /**
258    * Sets the service registry. This method is here as a convenience for developers that need the registry to do job
259    * waiting.
260    *
261    * @param serviceRegistry
262    *          the service registry
263    */
264   @Reference
265   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
266     this.serviceRegistry = serviceRegistry;
267   }
268 
269   /**
270    * Waits until all of the jobs have reached either one of these statuses:
271    * <ul>
272    * <li>{@link Job.Status#FINISHED}</li>
273    * <li>{@link Job.Status#FAILED}</li>
274    * <li>{@link Job.Status#DELETED}</li>
275    * </ul>
276    * After that, the method returns with the actual outcomes of the jobs.
277    *
278    * @param jobs
279    *          the jobs
280    * @return the jobs and their outcomes
281    * @throws IllegalStateException
282    *           if the service registry has not been set
283    * @throws IllegalArgumentException
284    *           if the jobs collecion is either <code>null</code> or empty
285    */
286   protected JobBarrier.Result waitForStatus(Job... jobs) throws IllegalStateException, IllegalArgumentException {
287     return waitForStatus(0, jobs);
288   }
289 
290   /**
291    * Waits until all of the jobs have reached either one of these statuses:
292    * <ul>
293    * <li>{@link Job.Status#FINISHED}</li>
294    * <li>{@link Job.Status#FAILED}</li>
295    * <li>{@link Job.Status#DELETED}</li>
296    * </ul>
297    * After that, the method returns with the actual outcomes of the jobs.
298    *
299    * @param timeout
300    *          the maximum amount of time in milliseconds to wait
301    * @param jobs
302    *          the jobs
303    * @return the jobs and their outcomes
304    * @throws IllegalStateException
305    *           if the service registry has not been set
306    * @throws IllegalArgumentException
307    *           if the jobs collection is either <code>null</code> or empty
308    */
309   protected JobBarrier.Result waitForStatus(long timeout, Job... jobs) throws IllegalStateException,
310           IllegalArgumentException {
311     if (serviceRegistry == null) {
312       throw new IllegalStateException("Can't wait for job status without providing a service registry first");
313     }
314     JobBarrier barrier = new JobBarrier(null, serviceRegistry, jobBarrierPollingInterval, jobs);
315     return barrier.waitForJobs(timeout);
316   }
317 
318   /**
319    * Get a configuration option.
320    *
321    * @deprecated use {@link #getConfig(WorkflowInstance, String)} or {@link #getOptConfig(org.opencastproject.workflow.api.WorkflowInstance, String)}
322    */
323   protected Option<String> getCfg(WorkflowInstance wi, String key) {
324     return option(wi.getCurrentOperation().getConfiguration(key));
325   }
326 
327   /**
328    * Get a mandatory configuration key. Values are returned trimmed.
329    *
330    * @throws WorkflowOperationException
331    *         if the configuration key is either missing or empty
332    */
333   protected String getConfig(WorkflowInstance wi, String key) throws WorkflowOperationException {
334     return getConfig(wi.getCurrentOperation(), key);
335   }
336 
337   /**
338    * Get a configuration key. Values are returned trimmed.
339    *
340    * @param w
341    *        WorkflowInstance with current operation
342    * @param key
343    *        Configuration key to check for
344    * @param defaultValue
345    *        Value to return if key does not exists
346    */
347   protected String getConfig(WorkflowInstance w, String key, String defaultValue) {
348     for (final String cfg : getOptConfig(w.getCurrentOperation(), key)) {
349       return cfg;
350     }
351     return defaultValue;
352   }
353 
354   /**
355    * Get a mandatory configuration key. Values are returned trimmed.
356    *
357    * @throws WorkflowOperationException
358    *         if the configuration key is either missing or empty
359    */
360   protected String getConfig(WorkflowOperationInstance woi, String key) throws WorkflowOperationException {
361     for (final String cfg : getOptConfig(woi, key)) {
362       return cfg;
363     }
364     throw new WorkflowOperationException(format("Configuration key '%s' is either missing or empty", key));
365   }
366 
367   /**
368    * Get an optional configuration key. Values are returned trimmed.
369    */
370   protected Opt<String> getOptConfig(WorkflowInstance wi, String key) {
371     return getOptConfig(wi.getCurrentOperation(), key);
372   }
373 
374   /**
375    * Get an optional configuration key. Values are returned trimmed.
376    */
377   protected Opt<String> getOptConfig(WorkflowOperationInstance woi, String key) {
378     return Opt.nul(woi.getConfiguration(key)).flatMap(Strings.trimToNone);
379   }
380 
381   /**
382    * Returns a ConfiguredTagsAndFlavors instance, which includes all specified source/target tags and flavors if they are valid
383    * Lists can be empty, if no values were specified! This is to enable WOHs to individually check if a given tag/flavor was set.
384    * This also means that you should use Configuration.many as parameter, if a tag/flavor is optional.
385    * @param srcTags none, one or many
386    * @param srcFlavors none, one or many
387    * @param targetFlavors none, one or many
388    * @param targetTags none, one or many
389    * @return ConfiguredTagsAndFlavors object including lists for the configured tags/flavors
390    */
391   protected ConfiguredTagsAndFlavors getTagsAndFlavors(WorkflowInstance workflow, Configuration srcTags, Configuration srcFlavors, Configuration targetTags, Configuration targetFlavors) throws WorkflowOperationException {
392     WorkflowOperationInstance operation = workflow.getCurrentOperation();
393     ConfiguredTagsAndFlavors tagsAndFlavors = new ConfiguredTagsAndFlavors();
394     MediaPackageElementFlavor flavor;
395 
396     List<String> srcTagList = new ArrayList<>();
397     String srcTag;
398     switch(srcTags) {
399       case none:
400         break;
401       case one:
402         srcTag = StringUtils.trimToNull(operation.getConfiguration(SOURCE_TAG));
403         if (srcTag == null) {
404           throw new WorkflowOperationException("Configuration key '" + SOURCE_TAG + "' must be set");
405         }
406         srcTagList.add(srcTag);
407         break;
408       case many:
409         srcTagList = asList(StringUtils.trimToNull(operation.getConfiguration(SOURCE_TAGS)));
410         srcTag = StringUtils.trimToNull(operation.getConfiguration(SOURCE_TAG));
411         if (srcTagList.isEmpty() && srcTag != null) {
412           srcTagList.add(srcTag);
413         }
414         break;
415       default:
416         throw new WorkflowOperationException("Couldn't process srcTags configuration option!");
417     }
418     tagsAndFlavors.setSrcTags(srcTagList);
419 
420     List<MediaPackageElementFlavor> srcFlavorList = new ArrayList<>();
421     String singleSourceFlavor;
422     switch(srcFlavors) {
423       case none:
424         break;
425       case one:
426         singleSourceFlavor = StringUtils.trimToNull(operation.getConfiguration(SOURCE_FLAVOR));
427         if (singleSourceFlavor == null) {
428           throw new WorkflowOperationException("Configuration key '" + SOURCE_FLAVOR + "' must be set");
429         }
430         try {
431           flavor = MediaPackageElementFlavor.parseFlavor(singleSourceFlavor);
432         } catch (IllegalArgumentException e) {
433           throw new WorkflowOperationException(singleSourceFlavor + " is not a valid flavor!");
434         }
435         srcFlavorList.add(flavor);
436         break;
437       case many:
438         List<String> srcFlavorString = asList(StringUtils.trimToNull(operation.getConfiguration(SOURCE_FLAVORS)));
439         singleSourceFlavor = StringUtils.trimToNull(operation.getConfiguration(SOURCE_FLAVOR));
440         if (srcFlavorString.isEmpty() && singleSourceFlavor != null) {
441           srcFlavorString.add(singleSourceFlavor);
442         }
443         for (String elem : srcFlavorString) {
444           try {
445             flavor = MediaPackageElementFlavor.parseFlavor(elem);
446             srcFlavorList.add(flavor);
447           } catch (IllegalArgumentException e) {
448             throw new WorkflowOperationException(elem + " is not a valid flavor!");
449           }
450         }
451         break;
452       default:
453         throw new WorkflowOperationException("Couldn't process srcFlavors configuration option!");
454     }
455     tagsAndFlavors.setSrcFlavors(srcFlavorList);
456 
457     List<String> targetTagList = new ArrayList<>();
458     String targetTag;
459     switch(targetTags) {
460       case none:
461         break;
462       case one:
463         targetTag = StringUtils.trimToNull(operation.getConfiguration(TARGET_TAG));
464         if (targetTag == null) {
465           throw new WorkflowOperationException("Configuration key '" + TARGET_TAG + "' must be set");
466         }
467         targetTagList.add(targetTag);
468         break;
469       case many:
470         targetTagList = asList(StringUtils.trimToNull(operation.getConfiguration(TARGET_TAGS)));
471         targetTag = StringUtils.trimToNull(operation.getConfiguration(TARGET_TAG));
472         if (targetTagList.isEmpty() && targetTag != null) {
473           targetTagList.add(targetTag);
474         }
475         break;
476       default:
477         throw new WorkflowOperationException("Couldn't process target-tag configuration option!");
478     }
479     tagsAndFlavors.setTargetTags(targetTagList);
480 
481     List<MediaPackageElementFlavor> targetFlavorList = new ArrayList<>();
482     String singleTargetFlavor;
483     switch(targetFlavors) {
484       case none:
485         break;
486       case one:
487         singleTargetFlavor = StringUtils.trimToNull(operation.getConfiguration(TARGET_FLAVOR));
488         if (singleTargetFlavor == null) {
489           throw new WorkflowOperationException("Configuration key '" + TARGET_FLAVOR + "' must be set");
490         }
491         try {
492           flavor = MediaPackageElementFlavor.parseFlavor(singleTargetFlavor);
493         } catch (IllegalArgumentException e) {
494           throw new WorkflowOperationException(singleTargetFlavor + " is not a valid flavor!");
495         }
496         targetFlavorList.add(flavor);
497         break;
498       case many:
499         List<String> targetFlavorString = asList(StringUtils.trimToNull(operation.getConfiguration(TARGET_FLAVORS)));
500         singleTargetFlavor = StringUtils.trimToNull(operation.getConfiguration(TARGET_FLAVOR));
501         if (targetFlavorString.isEmpty() && singleTargetFlavor != null) {
502           targetFlavorString.add(singleTargetFlavor);
503         }
504         for (String elem : targetFlavorString) {
505           try {
506             flavor = MediaPackageElementFlavor.parseFlavor(elem);
507           } catch (IllegalArgumentException e) {
508             throw new WorkflowOperationException(elem + " is not a valid flavor!");
509           }
510           targetFlavorList.add(flavor);
511         }
512         break;
513       default:
514         throw new WorkflowOperationException("Couldn't process targetFlavors configuration option!");
515     }
516     tagsAndFlavors.setTargetFlavors(targetFlavorList);
517     return tagsAndFlavors;
518   }
519 
520   /**
521    * Create an error function.
522    * <p>
523    * Example usage: <code>getCfg(wi, "key").getOrElse(this.&lt;String&gt;cfgKeyMissing("key"))</code>
524    *
525    * @see #getCfg(WorkflowInstance, String)
526    * @deprecated see {@link #getCfg(WorkflowInstance, String)} for details
527    */
528   protected <A> Function0<A> cfgKeyMissing(final String key) {
529     return new Function0<A>() {
530       @Override public A apply() {
531         return chuck(new WorkflowOperationException(key + " is missing or malformed"));
532       }
533     };
534   }
535 
536   /**
537    * Set the @link org.opencastproject.job.api.JobBarrier polling interval.
538    * <p>
539    * While waiting for other jobs to finish, the barrier will poll the status of these jobs until they are finished. To
540    * reduce load on the system, the polling is done only every x milliseconds. This interval defines the sleep time
541    * between these polls.
542    * <p>
543    * If most cases you want to leave this at its default value. It will make sense, though, to adjust this time if you
544    * know that your job will be exceptionally short. An example of this might be the unit tests where other jobs are
545    * usually mocked. But this setting is not limited to tests and may be a sensible options for other jobs as well.
546    *
547    * @param interval the time in miliseconds between two polling operations
548    *
549    * @see org.opencastproject.job.api.JobBarrier#DEFAULT_POLLING_INTERVAL
550    */
551   public void setJobBarrierPollingInterval(long interval) {
552     this.jobBarrierPollingInterval = interval;
553   }
554 
555   /**
556    * {@inheritDoc}
557    *
558    * @see java.lang.Object#hashCode()
559    */
560   @Override
561   public int hashCode() {
562     return id != null ? id.hashCode() : super.hashCode();
563   }
564 
565   /**
566    * {@inheritDoc}
567    *
568    * @see java.lang.Object#equals(java.lang.Object)
569    */
570   @Override
571   public boolean equals(Object obj) {
572     if (obj instanceof WorkflowOperationHandler) {
573       if (id != null)
574         return id.equals(((WorkflowOperationHandler) obj).getId());
575       else
576         return this == obj;
577     }
578     return false;
579   }
580 
581   /**
582    * {@inheritDoc}
583    *
584    * @see java.lang.Object#toString()
585    */
586   @Override
587   public String toString() {
588     return getId();
589   }
590 }