View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.api;
23  
24  import static java.lang.String.format;
25  import static org.opencastproject.util.data.Option.option;
26  import static org.opencastproject.util.data.functions.Misc.chuck;
27  
28  import org.opencastproject.job.api.Job;
29  import org.opencastproject.job.api.JobBarrier;
30  import org.opencastproject.job.api.JobContext;
31  import org.opencastproject.mediapackage.MediaPackage;
32  import org.opencastproject.mediapackage.MediaPackageElement;
33  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
34  import org.opencastproject.serviceregistry.api.ServiceRegistry;
35  import org.opencastproject.util.data.Function;
36  import org.opencastproject.util.data.Function0;
37  import org.opencastproject.util.data.Option;
38  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
39  
40  import com.entwinemedia.fn.data.Opt;
41  import com.entwinemedia.fn.fns.Strings;
42  
43  import org.apache.commons.io.FilenameUtils;
44  import org.apache.commons.lang3.StringUtils;
45  import org.osgi.framework.Constants;
46  import org.osgi.service.component.ComponentContext;
47  import org.osgi.service.component.annotations.Activate;
48  import org.osgi.service.component.annotations.Reference;
49  import org.slf4j.Logger;
50  import org.slf4j.LoggerFactory;
51  
52  import java.util.ArrayList;
53  import java.util.List;
54  import java.util.Map;
55  
56  /**
57   * Abstract base implementation for an operation handler, which implements a simple start operation that returns a
58   * {@link WorkflowOperationResult} with the current mediapackage and {@link Action#CONTINUE}.
59   */
60  public abstract class AbstractWorkflowOperationHandler implements WorkflowOperationHandler {
61  
62    /** The logging facility */
63    private static final Logger logger = LoggerFactory.getLogger(AbstractWorkflowOperationHandler.class);
64  
65    /** The ID of this operation handler */
66    protected String id = null;
67  
68    /** The description of what this handler actually does */
69    protected String description = null;
70  
71    /** Optional service registry */
72    protected ServiceRegistry serviceRegistry = null;
73  
74    /** The JobBarrier polling interval */
75    private long jobBarrierPollingInterval = JobBarrier.DEFAULT_POLLING_INTERVAL;
76  
77    /** Config for Tag Parsing operation */
78    protected enum Configuration {
79      none, one, many
80    };
81  
82    public static final String TARGET_FLAVORS = "target-flavors";
83    public static final String TARGET_FLAVOR = "target-flavor";
84    public static final String TARGET_TAGS = "target-tags";
85    public static final String TARGET_TAG = "target-tag";
86    public static final String SOURCE_FLAVORS = "source-flavors";
87    public static final String SOURCE_FLAVOR = "source-flavor";
88    public static final String SOURCE_TAG = "source-tag";
89    public static final String SOURCE_TAGS = "source-tags";
90  
91    /**
92     * Activates this component with its properties once all of the collaborating services have been set
93     *
94     * @param cc
95     *          The component's context, containing the properties used for configuration
96     */
97    @Activate
98    protected void activate(ComponentContext cc) {
99      this.id = (String) cc.getProperties().get(WorkflowService.WORKFLOW_OPERATION_PROPERTY);
100     this.description = (String) cc.getProperties().get(Constants.SERVICE_DESCRIPTION);
101   }
102 
103   /**
104    * {@inheritDoc}
105    *
106    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#start(
107    *      org.opencastproject.workflow.api.WorkflowInstance, JobContext)
108    */
109   @Override
110   public abstract WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
111           throws WorkflowOperationException;
112 
113   /**
114    * {@inheritDoc}
115    *
116    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#skip(
117    *      org.opencastproject.workflow.api.WorkflowInstance, JobContext)
118    */
119   @Override
120   public WorkflowOperationResult skip(WorkflowInstance workflowInstance, JobContext context)
121           throws WorkflowOperationException {
122     return createResult(Action.SKIP);
123   }
124 
125   /**
126    * {@inheritDoc}
127    *
128    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#destroy(
129    *      org.opencastproject.workflow.api.WorkflowInstance, JobContext)
130    */
131   @Override
132   public void destroy(WorkflowInstance workflowInstance, JobContext context) throws WorkflowOperationException {
133   }
134 
135   /**
136    * Converts a comma separated string into a set of values. Useful for converting operation configuration strings into
137    * multi-valued sets.
138    *
139    * @param elements
140    *          The comma space separated string
141    * @return the set of values
142    */
143   protected List<String> asList(String elements) {
144     elements = StringUtils.trimToEmpty(elements);
145     List<String> list = new ArrayList<>();
146     for (String s : StringUtils.split(elements, ",")) {
147       if (StringUtils.trimToNull(s) != null) {
148         list.add(s.trim());
149       }
150     }
151     return list;
152   }
153 
154   /** {@link #asList(String)} as a function. */
155   protected Function<String, List<String>> asList = new Function<String, List<String>>() {
156     @Override public List<String> apply(String s) {
157       return asList(s);
158     }
159   };
160 
161   /**
162    * Generates a filename using the base name of a source element and the extension of a derived element.
163    *
164    * @param source
165    *          the source media package element
166    * @param derived
167    *          the derived media package element
168    * @return the filename
169    */
170   protected String getFileNameFromElements(MediaPackageElement source, MediaPackageElement derived) {
171     String fileName = FilenameUtils.getBaseName(source.getURI().getPath());
172     String fileExtension = FilenameUtils.getExtension(derived.getURI().getPath());
173     return fileName + "." + fileExtension;
174   }
175 
176   /**
177    * {@inheritDoc}
178    *
179    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#getId()
180    */
181   @Override
182   public String getId() {
183     return id;
184   }
185 
186   /**
187    * {@inheritDoc}
188    *
189    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#getDescription()
190    */
191   @Override
192   public String getDescription() {
193     return description;
194   }
195 
196   /**
197    * Creates a result for the execution of this workflow operation handler.
198    *
199    * @param action
200    *          the action to take
201    * @return the result
202    */
203   protected WorkflowOperationResult createResult(Action action) {
204     return createResult(null, null, action, 0);
205   }
206 
207   /**
208    * Creates a result for the execution of this workflow operation handler.
209    *
210    * @param mediaPackage
211    *          the modified mediapackage
212    * @param action
213    *          the action to take
214    * @return the result
215    */
216   protected WorkflowOperationResult createResult(MediaPackage mediaPackage, Action action) {
217     return createResult(mediaPackage, null, action, 0);
218   }
219 
220   /**
221    * Creates a result for the execution of this workflow operation handler.
222    * <p>
223    * Since there is no way for the workflow service to determine the queuing time (e. g. waiting on services), it needs
224    * to be provided by the handler.
225    *
226    * @param mediaPackage
227    *          the modified mediapackage
228    * @param action
229    *          the action to take
230    * @param timeInQueue
231    *          the amount of time this handle spent waiting for services
232    * @return the result
233    */
234   protected WorkflowOperationResult createResult(MediaPackage mediaPackage, Action action, long timeInQueue) {
235     return createResult(mediaPackage, null, action, timeInQueue);
236   }
237 
238   /**
239    * Creates a result for the execution of this workflow operation handler.
240    * <p>
241    * Since there is no way for the workflow service to determine the queuing time (e. g. waiting on services), it needs
242    * to be provided by the handler.
243    *
244    * @param mediaPackage
245    *          the modified mediapackage
246    * @param properties
247    *          the properties to add to the workflow instance
248    * @param action
249    *          the action to take
250    * @param timeInQueue
251    *          the amount of time this handle spent waiting for services
252    * @return the result
253    */
254   protected WorkflowOperationResult createResult(MediaPackage mediaPackage, Map<String, String> properties,
255           Action action, long timeInQueue) {
256     return new WorkflowOperationResultImpl(mediaPackage, properties, action, timeInQueue);
257   }
258 
259   /**
260    * Sets the service registry. This method is here as a convenience for developers that need the registry to do job
261    * waiting.
262    *
263    * @param serviceRegistry
264    *          the service registry
265    */
266   @Reference
267   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
268     this.serviceRegistry = serviceRegistry;
269   }
270 
271   /**
272    * Waits until all of the jobs have reached either one of these statuses:
273    * <ul>
274    * <li>{@link Job.Status#FINISHED}</li>
275    * <li>{@link Job.Status#FAILED}</li>
276    * <li>{@link Job.Status#DELETED}</li>
277    * </ul>
278    * After that, the method returns with the actual outcomes of the jobs.
279    *
280    * @param jobs
281    *          the jobs
282    * @return the jobs and their outcomes
283    * @throws IllegalStateException
284    *           if the service registry has not been set
285    * @throws IllegalArgumentException
286    *           if the jobs collecion is either <code>null</code> or empty
287    */
288   protected JobBarrier.Result waitForStatus(Job... jobs) throws IllegalStateException, IllegalArgumentException {
289     return waitForStatus(0, jobs);
290   }
291 
292   /**
293    * Waits until all of the jobs have reached either one of these statuses:
294    * <ul>
295    * <li>{@link Job.Status#FINISHED}</li>
296    * <li>{@link Job.Status#FAILED}</li>
297    * <li>{@link Job.Status#DELETED}</li>
298    * </ul>
299    * After that, the method returns with the actual outcomes of the jobs.
300    *
301    * @param timeout
302    *          the maximum amount of time in milliseconds to wait
303    * @param jobs
304    *          the jobs
305    * @return the jobs and their outcomes
306    * @throws IllegalStateException
307    *           if the service registry has not been set
308    * @throws IllegalArgumentException
309    *           if the jobs collection is either <code>null</code> or empty
310    */
311   protected JobBarrier.Result waitForStatus(long timeout, Job... jobs) throws IllegalStateException,
312           IllegalArgumentException {
313     if (serviceRegistry == null) {
314       throw new IllegalStateException("Can't wait for job status without providing a service registry first");
315     }
316     JobBarrier barrier = new JobBarrier(null, serviceRegistry, jobBarrierPollingInterval, jobs);
317     return barrier.waitForJobs(timeout);
318   }
319 
320   /**
321    * Get a configuration option.
322    *
323    * @deprecated use {@link #getConfig(WorkflowInstance, String)} or
324    *             {@link #getOptConfig(org.opencastproject.workflow.api.WorkflowInstance, String)}
325    */
326   protected Option<String> getCfg(WorkflowInstance wi, String key) {
327     return option(wi.getCurrentOperation().getConfiguration(key));
328   }
329 
330   /**
331    * Get a mandatory configuration key. Values are returned trimmed.
332    *
333    * @throws WorkflowOperationException
334    *         if the configuration key is either missing or empty
335    */
336   protected String getConfig(WorkflowInstance wi, String key) throws WorkflowOperationException {
337     return getConfig(wi.getCurrentOperation(), key);
338   }
339 
340   /**
341    * Get a configuration key. Values are returned trimmed.
342    *
343    * @param w
344    *        WorkflowInstance with current operation
345    * @param key
346    *        Configuration key to check for
347    * @param defaultValue
348    *        Value to return if key does not exists
349    */
350   protected String getConfig(WorkflowInstance w, String key, String defaultValue) {
351     for (final String cfg : getOptConfig(w.getCurrentOperation(), key)) {
352       return cfg;
353     }
354     return defaultValue;
355   }
356 
357   /**
358    * Get a mandatory configuration key. Values are returned trimmed.
359    *
360    * @throws WorkflowOperationException
361    *         if the configuration key is either missing or empty
362    */
363   protected String getConfig(WorkflowOperationInstance woi, String key) throws WorkflowOperationException {
364     for (final String cfg : getOptConfig(woi, key)) {
365       return cfg;
366     }
367     throw new WorkflowOperationException(format("Configuration key '%s' is either missing or empty", key));
368   }
369 
370   /**
371    * Get an optional configuration key. Values are returned trimmed.
372    */
373   protected Opt<String> getOptConfig(WorkflowInstance wi, String key) {
374     return getOptConfig(wi.getCurrentOperation(), key);
375   }
376 
377   /**
378    * Get an optional configuration key. Values are returned trimmed.
379    */
380   protected Opt<String> getOptConfig(WorkflowOperationInstance woi, String key) {
381     return Opt.nul(woi.getConfiguration(key)).flatMap(Strings.trimToNone);
382   }
383 
384   /**
385    * Returns a ConfiguredTagsAndFlavors instance, which includes all specified source/target tags and flavors if they
386    * are valid. Lists can be empty, if no values were specified! This is to enable WOHs to individually check if a
387    * given tag/flavor was set. This also means that you should use Configuration.many as parameter, if a tag/flavor is
388    * optional.
389    * @param srcTags none, one or many
390    * @param srcFlavors none, one or many
391    * @param targetFlavors none, one or many
392    * @param targetTags none, one or many
393    * @return ConfiguredTagsAndFlavors object including lists for the configured tags/flavors
394    */
395   protected ConfiguredTagsAndFlavors getTagsAndFlavors(WorkflowInstance workflow, Configuration srcTags,
396       Configuration srcFlavors, Configuration targetTags, Configuration targetFlavors)
397           throws WorkflowOperationException {
398     WorkflowOperationInstance operation = workflow.getCurrentOperation();
399     ConfiguredTagsAndFlavors tagsAndFlavors = new ConfiguredTagsAndFlavors();
400     MediaPackageElementFlavor flavor;
401 
402     List<String> srcTagList = new ArrayList<>();
403     String srcTag;
404     switch(srcTags) {
405       case none:
406         break;
407       case one:
408         srcTag = StringUtils.trimToNull(operation.getConfiguration(SOURCE_TAG));
409         if (srcTag == null) {
410           throw new WorkflowOperationException("Configuration key '" + SOURCE_TAG + "' must be set");
411         }
412         srcTagList.add(srcTag);
413         break;
414       case many:
415         srcTagList = asList(StringUtils.trimToNull(operation.getConfiguration(SOURCE_TAGS)));
416         srcTag = StringUtils.trimToNull(operation.getConfiguration(SOURCE_TAG));
417         if (srcTagList.isEmpty() && srcTag != null) {
418           srcTagList.add(srcTag);
419         }
420         break;
421       default:
422         throw new WorkflowOperationException("Couldn't process srcTags configuration option!");
423     }
424     tagsAndFlavors.setSrcTags(srcTagList);
425 
426     List<MediaPackageElementFlavor> srcFlavorList = new ArrayList<>();
427     String singleSourceFlavor;
428     switch(srcFlavors) {
429       case none:
430         break;
431       case one:
432         singleSourceFlavor = StringUtils.trimToNull(operation.getConfiguration(SOURCE_FLAVOR));
433         if (singleSourceFlavor == null) {
434           throw new WorkflowOperationException("Configuration key '" + SOURCE_FLAVOR + "' must be set");
435         }
436         try {
437           flavor = MediaPackageElementFlavor.parseFlavor(singleSourceFlavor);
438         } catch (IllegalArgumentException e) {
439           throw new WorkflowOperationException(singleSourceFlavor + " is not a valid flavor!");
440         }
441         srcFlavorList.add(flavor);
442         break;
443       case many:
444         List<String> srcFlavorString = asList(StringUtils.trimToNull(operation.getConfiguration(SOURCE_FLAVORS)));
445         singleSourceFlavor = StringUtils.trimToNull(operation.getConfiguration(SOURCE_FLAVOR));
446         if (srcFlavorString.isEmpty() && singleSourceFlavor != null) {
447           srcFlavorString.add(singleSourceFlavor);
448         }
449         for (String elem : srcFlavorString) {
450           try {
451             flavor = MediaPackageElementFlavor.parseFlavor(elem);
452             srcFlavorList.add(flavor);
453           } catch (IllegalArgumentException e) {
454             throw new WorkflowOperationException(elem + " is not a valid flavor!");
455           }
456         }
457         break;
458       default:
459         throw new WorkflowOperationException("Couldn't process srcFlavors configuration option!");
460     }
461     tagsAndFlavors.setSrcFlavors(srcFlavorList);
462 
463     List<String> targetTagList = new ArrayList<>();
464     String targetTag;
465     switch(targetTags) {
466       case none:
467         break;
468       case one:
469         targetTag = StringUtils.trimToNull(operation.getConfiguration(TARGET_TAG));
470         if (targetTag == null) {
471           throw new WorkflowOperationException("Configuration key '" + TARGET_TAG + "' must be set");
472         }
473         targetTagList.add(targetTag);
474         break;
475       case many:
476         targetTagList = asList(StringUtils.trimToNull(operation.getConfiguration(TARGET_TAGS)));
477         targetTag = StringUtils.trimToNull(operation.getConfiguration(TARGET_TAG));
478         if (targetTagList.isEmpty() && targetTag != null) {
479           targetTagList.add(targetTag);
480         }
481         break;
482       default:
483         throw new WorkflowOperationException("Couldn't process target-tag configuration option!");
484     }
485     tagsAndFlavors.setTargetTags(targetTagList);
486 
487     List<MediaPackageElementFlavor> targetFlavorList = new ArrayList<>();
488     String singleTargetFlavor;
489     switch(targetFlavors) {
490       case none:
491         break;
492       case one:
493         singleTargetFlavor = StringUtils.trimToNull(operation.getConfiguration(TARGET_FLAVOR));
494         if (singleTargetFlavor == null) {
495           throw new WorkflowOperationException("Configuration key '" + TARGET_FLAVOR + "' must be set");
496         }
497         try {
498           flavor = MediaPackageElementFlavor.parseFlavor(singleTargetFlavor);
499         } catch (IllegalArgumentException e) {
500           throw new WorkflowOperationException(singleTargetFlavor + " is not a valid flavor!");
501         }
502         targetFlavorList.add(flavor);
503         break;
504       case many:
505         List<String> targetFlavorString = asList(StringUtils.trimToNull(operation.getConfiguration(TARGET_FLAVORS)));
506         singleTargetFlavor = StringUtils.trimToNull(operation.getConfiguration(TARGET_FLAVOR));
507         if (targetFlavorString.isEmpty() && singleTargetFlavor != null) {
508           targetFlavorString.add(singleTargetFlavor);
509         }
510         for (String elem : targetFlavorString) {
511           try {
512             flavor = MediaPackageElementFlavor.parseFlavor(elem);
513           } catch (IllegalArgumentException e) {
514             throw new WorkflowOperationException(elem + " is not a valid flavor!");
515           }
516           targetFlavorList.add(flavor);
517         }
518         break;
519       default:
520         throw new WorkflowOperationException("Couldn't process targetFlavors configuration option!");
521     }
522     tagsAndFlavors.setTargetFlavors(targetFlavorList);
523     return tagsAndFlavors;
524   }
525 
526   /**
527    * Create an error function.
528    * <p>
529    * Example usage: <code>getCfg(wi, "key").getOrElse(this.&lt;String&gt;cfgKeyMissing("key"))</code>
530    *
531    * @see #getCfg(WorkflowInstance, String)
532    * @deprecated see {@link #getCfg(WorkflowInstance, String)} for details
533    */
534   protected <A> Function0<A> cfgKeyMissing(final String key) {
535     return new Function0<A>() {
536       @Override public A apply() {
537         return chuck(new WorkflowOperationException(key + " is missing or malformed"));
538       }
539     };
540   }
541 
542   /**
543    * Set the @link org.opencastproject.job.api.JobBarrier polling interval.
544    * <p>
545    * While waiting for other jobs to finish, the barrier will poll the status of these jobs until they are finished. To
546    * reduce load on the system, the polling is done only every x milliseconds. This interval defines the sleep time
547    * between these polls.
548    * <p>
549    * If most cases you want to leave this at its default value. It will make sense, though, to adjust this time if you
550    * know that your job will be exceptionally short. An example of this might be the unit tests where other jobs are
551    * usually mocked. But this setting is not limited to tests and may be a sensible options for other jobs as well.
552    *
553    * @param interval the time in miliseconds between two polling operations
554    *
555    * @see org.opencastproject.job.api.JobBarrier#DEFAULT_POLLING_INTERVAL
556    */
557   public void setJobBarrierPollingInterval(long interval) {
558     this.jobBarrierPollingInterval = interval;
559   }
560 
561   /**
562    * {@inheritDoc}
563    *
564    * @see java.lang.Object#hashCode()
565    */
566   @Override
567   public int hashCode() {
568     return id != null ? id.hashCode() : super.hashCode();
569   }
570 
571   /**
572    * {@inheritDoc}
573    *
574    * @see java.lang.Object#equals(java.lang.Object)
575    */
576   @Override
577   public boolean equals(Object obj) {
578     if (obj instanceof WorkflowOperationHandler) {
579       if (id != null) {
580         return id.equals(((WorkflowOperationHandler) obj).getId());
581       } else {
582         return this == obj;
583       }
584     }
585     return false;
586   }
587 
588   /**
589    * {@inheritDoc}
590    *
591    * @see java.lang.Object#toString()
592    */
593   @Override
594   public String toString() {
595     return getId();
596   }
597 }