View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.workflow.handler.distribution;
22  
23  import org.opencastproject.distribution.api.DistributionException;
24  import org.opencastproject.distribution.api.DownloadDistributionService;
25  import org.opencastproject.distribution.api.StreamingDistributionService;
26  import org.opencastproject.job.api.Job;
27  import org.opencastproject.job.api.JobContext;
28  import org.opencastproject.mediapackage.MediaPackage;
29  import org.opencastproject.mediapackage.MediaPackageElement;
30  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
31  import org.opencastproject.mediapackage.MediaPackageElementParser;
32  import org.opencastproject.mediapackage.MediaPackageException;
33  import org.opencastproject.mediapackage.Publication;
34  import org.opencastproject.mediapackage.PublicationImpl;
35  import org.opencastproject.mediapackage.selector.SimpleElementSelector;
36  import org.opencastproject.security.api.SecurityService;
37  import org.opencastproject.serviceregistry.api.ServiceRegistry;
38  import org.opencastproject.util.MimeType;
39  import org.opencastproject.util.MimeTypes;
40  import org.opencastproject.util.RequireUtil;
41  import org.opencastproject.util.doc.DocUtil;
42  import org.opencastproject.workflow.api.WorkflowInstance;
43  import org.opencastproject.workflow.api.WorkflowOperationException;
44  import org.opencastproject.workflow.api.WorkflowOperationHandler;
45  import org.opencastproject.workflow.api.WorkflowOperationInstance;
46  import org.opencastproject.workflow.api.WorkflowOperationResult;
47  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
48  
49  import org.apache.commons.lang3.ArrayUtils;
50  import org.apache.commons.lang3.BooleanUtils;
51  import org.apache.commons.lang3.StringUtils;
52  import org.osgi.service.component.annotations.Component;
53  import org.osgi.service.component.annotations.Reference;
54  import org.slf4j.Logger;
55  import org.slf4j.LoggerFactory;
56  
57  import java.net.URI;
58  import java.net.URISyntaxException;
59  import java.util.ArrayList;
60  import java.util.Arrays;
61  import java.util.Collection;
62  import java.util.HashMap;
63  import java.util.HashSet;
64  import java.util.List;
65  import java.util.Map;
66  import java.util.Set;
67  import java.util.UUID;
68  
69  /**
70   * WOH that distributes selected elements to an internal distribution channel and adds reflective publication elements
71   * to the media package.
72   */
73  
74  @Component(
75      immediate = true,
76      service = WorkflowOperationHandler.class,
77      property = {
78          "service.description=Configurable Publication Workflow Handler",
79          "workflow.operation=publish-configure"
80      }
81  )
82  public class ConfigurablePublishWorkflowOperationHandler extends ConfigurableWorkflowOperationHandlerBase {
83  
84    /** The logging facility */
85    private static final Logger logger = LoggerFactory.getLogger(ConfigurablePublishWorkflowOperationHandler.class);
86  
87    /** The template key for adding the mediapackage / event id to the publication path. */
88    protected static final String EVENT_ID_TEMPLATE_KEY = "event_id";
89    /** The template key for adding the player location path to the publication path. */
90    protected static final String PLAYER_PATH_TEMPLATE_KEY = "player_path";
91    /** The template key for adding the publication id to the publication path. */
92    protected static final String PUBLICATION_ID_TEMPLATE_KEY = "publication_id";
93    /** The template key for adding the series id to the publication path. */
94    protected static final String SERIES_ID_TEMPLATE_KEY = "series_id";
95    /** The configuration property value for the player location. */
96    protected static final String PLAYER_PROPERTY = "player";
97    /** The template key name prefix for organization keys */
98    protected static final String ORG_TEMPLATE_KEY_PREFIX = "org_";
99  
100   // service references
101   private DownloadDistributionService downloadDistributionService;
102   private StreamingDistributionService streamingDistributionService;
103   private SecurityService securityService;
104 
105   /** Workflow configuration options */
106   static final String DOWNLOAD_SOURCE_FLAVORS = "download-source-flavors";
107   static final String DOWNLOAD_SOURCE_TAGS = "download-source-tags";
108   static final String STREAMING_SOURCE_TAGS = "streaming-source-tags";
109   static final String STREAMING_SOURCE_FLAVORS = "streaming-source-flavors";
110   static final String CHANNEL_ID_KEY = "channel-id";
111   static final String MIME_TYPE = "mimetype";
112   static final String WITH_PUBLISHED_ELEMENTS = "with-published-elements";
113   static final String CHECK_AVAILABILITY = "check-availability";
114   static final String STRATEGY = "strategy";
115   static final String MODE = "mode";
116 
117   /** Known values for mode **/
118   static final String MODE_SINGLE = "single";
119   static final String MODE_MIXED = "mixed";
120   static final String MODE_BULK = "bulk";
121 
122   static final String[] KNOWN_MODES = { MODE_SINGLE, MODE_MIXED, MODE_BULK };
123 
124   static final String DEFAULT_MODE = MODE_BULK;
125 
126   /** The workflow configuration key for defining the url pattern. */
127   static final String URL_PATTERN = "url-pattern";
128 
129   static final String RETRACT_STREAMING = "retract-streaming";
130   static final boolean RETRACT_STREAMING_DEFAULT = false;
131 
132   /** OSGi DI */
133   @Reference(target = "(distribution.channel=download)")
134   void setDownloadDistributionService(DownloadDistributionService distributionService) {
135     this.downloadDistributionService = distributionService;
136   }
137 
138   @Reference(target = "(distribution.channel=streaming)")
139   void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
140     this.streamingDistributionService = streamingDistributionService;
141   }
142 
143   /** OSGi DI */
144   @Reference
145   protected void setSecurityService(SecurityService securityService) {
146     this.securityService = securityService;
147   }
148 
149   @Override
150   protected DownloadDistributionService getDownloadDistributionService() {
151     assert (downloadDistributionService != null);
152     return downloadDistributionService;
153   }
154 
155   @Override
156   protected StreamingDistributionService getStreamingDistributionService() {
157     assert (streamingDistributionService != null);
158     return streamingDistributionService;
159   }
160 
161   /**
162    * Replace possible variables in the url-pattern configuration for this workflow operation handler.
163    *
164    * @param urlPattern
165    *          The operation's template for replacing the variables.
166    * @param mp
167    *          The {@link MediaPackage} used to get the event / mediapackage id.
168    * @param pubUUID
169    *          The UUID for the published element.
170    * @return The URI of the published element with the variables replaced.
171    * @throws WorkflowOperationException
172    *           Thrown if the URI is malformed after replacing the variables.
173    */
174   public URI populateUrlWithVariables(String urlPattern, MediaPackage mp, String pubUUID)
175           throws WorkflowOperationException {
176     Map<String, Object> values = new HashMap<>();
177     values.put(EVENT_ID_TEMPLATE_KEY, mp.getIdentifier().toString());
178     values.put(PUBLICATION_ID_TEMPLATE_KEY, pubUUID);
179     String playerPath = securityService.getOrganization().getProperties().get(PLAYER_PROPERTY);
180     values.put(PLAYER_PATH_TEMPLATE_KEY, playerPath);
181     values.put(SERIES_ID_TEMPLATE_KEY, StringUtils.trimToEmpty(mp.getSeries()));
182     Map<String, String> orgProperties = securityService.getOrganization().getProperties();
183     orgProperties.put("id", securityService.getOrganization().getId());
184     orgProperties.put("name", securityService.getOrganization().getName());
185     orgProperties.put("admin_role", securityService.getOrganization().getAdminRole());
186     orgProperties.put("anonymous_role", securityService.getOrganization().getAnonymousRole());
187     for (Map.Entry<String, String> orgProperty : orgProperties.entrySet()) {
188       values.put(ORG_TEMPLATE_KEY_PREFIX + orgProperty.getKey().replace('.', '_').toLowerCase(),
189               orgProperty.getValue());
190     }
191     String uriWithVariables = DocUtil.processTextTemplate("Replacing Variables in Publish URL", urlPattern, values);
192     URI publicationURI;
193     try {
194       publicationURI = new URI(uriWithVariables);
195     } catch (URISyntaxException e) {
196       throw new WorkflowOperationException(String.format(
197               "Unable to create URI from template '%s', replacement was: '%s'", urlPattern, uriWithVariables), e);
198     }
199     return publicationURI;
200   }
201 
202   @Override
203   public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
204           throws WorkflowOperationException {
205     RequireUtil.notNull(workflowInstance, "workflowInstance");
206 
207     final MediaPackage mp = workflowInstance.getMediaPackage();
208     final WorkflowOperationInstance op = workflowInstance.getCurrentOperation();
209 
210     final String channelId = StringUtils.trimToEmpty(op.getConfiguration(CHANNEL_ID_KEY));
211     if ("".equals(channelId)) {
212       throw new WorkflowOperationException("Unable to publish this mediapackage as the configuration key "
213               + CHANNEL_ID_KEY + " is missing. Unable to determine where to publish these elements.");
214     }
215 
216     final String urlPattern = StringUtils.trimToEmpty(op.getConfiguration(URL_PATTERN));
217 
218     MimeType mimetype = null;
219     String mimetypeString = StringUtils.trimToEmpty(op.getConfiguration(MIME_TYPE));
220     if (!"".equals(mimetypeString)) {
221       try {
222         mimetype = MimeTypes.parseMimeType(mimetypeString);
223       } catch (IllegalArgumentException e) {
224         throw new WorkflowOperationException("Unable to parse the provided configuration for " + MIME_TYPE, e);
225       }
226     }
227 
228     final boolean withPublishedElements = BooleanUtils
229             .toBoolean(StringUtils.trimToEmpty(op.getConfiguration(WITH_PUBLISHED_ELEMENTS)));
230 
231     boolean checkAvailability = BooleanUtils
232             .toBoolean(StringUtils.trimToEmpty(op.getConfiguration(CHECK_AVAILABILITY)));
233 
234     boolean retractStreaming = RETRACT_STREAMING_DEFAULT;
235     String retractStreamingString = workflowInstance.getConfiguration(RETRACT_STREAMING);
236     if (retractStreamingString != null) {
237       retractStreaming = BooleanUtils.toBoolean(StringUtils.trimToEmpty(retractStreamingString));
238     }
239 
240     if (getPublications(mp, channelId).size() > 0) {
241       final String rePublishStrategy = StringUtils.trimToEmpty(op.getConfiguration(STRATEGY));
242 
243       switch (rePublishStrategy) {
244 
245         case ("fail"):
246           // fail is a dummy function for further distribution strategies
247           fail(mp);
248           break;
249         case ("merge"):
250           // nothing to do here. other publication strategies can be added to this list later on
251           break;
252         default:
253           retract(mp, channelId, retractStreaming);
254       }
255     }
256 
257     String mode = StringUtils.trimToEmpty(op.getConfiguration(MODE));
258     if ("".equals(mode)) {
259       mode = DEFAULT_MODE;
260     } else if (!ArrayUtils.contains(KNOWN_MODES, mode)) {
261       logger.error("Unknown value for configuration key mode: '{}'", mode);
262       throw new IllegalArgumentException("Unknown value for configuration key mode");
263     }
264 
265     final String[] downloadSourceFlavors
266         = StringUtils.split(StringUtils.trimToEmpty(op.getConfiguration(DOWNLOAD_SOURCE_FLAVORS)), ",");
267     final String[] downloadSourceTags
268         = StringUtils.split(StringUtils.trimToEmpty(op.getConfiguration(DOWNLOAD_SOURCE_TAGS)), ",");
269     final String[] streamingSourceFlavors
270         = StringUtils.split(StringUtils.trimToEmpty(op.getConfiguration(STREAMING_SOURCE_FLAVORS)), ",");
271     final String[] streamingSourceTags
272         = StringUtils.split(StringUtils.trimToEmpty(op.getConfiguration(STREAMING_SOURCE_TAGS)), ",");
273 
274     String publicationUUID = UUID.randomUUID().toString();
275     Publication publication = PublicationImpl.publication(publicationUUID, channelId, null, null);
276 
277     // Configure the element selectors
278     final SimpleElementSelector downloadSelector = new SimpleElementSelector();
279     for (String flavor : downloadSourceFlavors) {
280       downloadSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
281     }
282     for (String tag : downloadSourceTags) {
283       downloadSelector.addTag(tag);
284     }
285 
286     final SimpleElementSelector streamingSelector = new SimpleElementSelector();
287     for (String flavor : streamingSourceFlavors) {
288       streamingSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
289     }
290     for (String tag : streamingSourceTags) {
291       streamingSelector.addTag(tag);
292     }
293 
294     boolean streamingElementsDistributed = false;
295     boolean downloadElementsDistributed = false;
296 
297     if (streamingDistributionService != null && streamingDistributionService.publishToStreaming()
298             && (streamingSourceFlavors.length > 0 || streamingSourceTags.length > 0)) {
299       streamingElementsDistributed = distributeElements(streamingSelector, mp, publication, channelId, mode,
300               withPublishedElements, checkAvailability, true);
301     }
302 
303     if (downloadSourceFlavors.length > 0 || downloadSourceTags.length > 0) {
304       downloadElementsDistributed = distributeElements(downloadSelector, mp, publication, channelId, mode,
305               withPublishedElements, checkAvailability, false);
306     }
307 
308     if (!downloadElementsDistributed && !streamingElementsDistributed
309         && (downloadSourceFlavors.length > 0 || downloadSourceTags.length > 0
310         || streamingSourceFlavors.length > 0 || streamingSourceTags.length > 0)) {
311       // skip publication if no elements was distributed but should be
312       return createResult(mp, Action.SKIP);
313     }
314 
315     if (!"".equals(urlPattern)) {
316       publication.setURI(populateUrlWithVariables(urlPattern, mp, publicationUUID));
317     }
318     if (mimetype != null) {
319       publication.setMimeType(mimetype);
320     }
321     mp.add(publication);
322     return createResult(mp, Action.CONTINUE);
323   }
324 
325   private boolean distributeElements(SimpleElementSelector selector, MediaPackage mp, Publication publication,
326           String channelId, String mode, boolean withPublishedElements, boolean checkAvailability, boolean streaming)
327           throws WorkflowOperationException {
328 
329     String target = (streaming ? "streaming" : "download");
330     if (!withPublishedElements) {
331       Set<MediaPackageElement> elements = distribute(selector.select(mp, false), mp, channelId, mode,
332               checkAvailability, streaming);
333       if (elements.size() > 0) {
334         for (MediaPackageElement element : elements) {
335           // Make sure the mediapackage is prompted to create a new identifier for this element
336           element.setIdentifier(null);
337           PublicationImpl.addElementToPublication(publication, element);
338         }
339       } else {
340         logger.info("No element found for distribution to " + target + " in media package '{}'", mp);
341         return false;
342       }
343     } else {
344       List<MediaPackageElement> publishedElements = new ArrayList<>();
345       for (Publication alreadyPublished : mp.getPublications()) {
346         publishedElements.addAll(Arrays.asList(alreadyPublished.getAttachments()));
347         publishedElements.addAll(Arrays.asList(alreadyPublished.getCatalogs()));
348         publishedElements.addAll(Arrays.asList(alreadyPublished.getTracks()));
349       }
350 
351       Collection<MediaPackageElement> elements = selector.select(publishedElements, false);
352       if (elements.size() > 0) {
353         for (MediaPackageElement element : elements) {
354           PublicationImpl.addElementToPublication(publication, element);
355         }
356       } else {
357         logger.info("No elements found for publication to " + target + " in media package '{}'", mp);
358         return false;
359       }
360     }
361     return true;
362   }
363 
364   private Set<MediaPackageElement> distribute(
365       Collection<MediaPackageElement> elements,
366       MediaPackage mediapackage,
367       String channelId,
368       String mode,
369       boolean checkAvailability,
370       boolean streaming
371   ) throws WorkflowOperationException {
372 
373     Set<MediaPackageElement> result = new HashSet<>();
374 
375     Set<String> bulkElementIds = new HashSet<>();
376     Set<String> singleElementIds = new HashSet<>();
377 
378     for (MediaPackageElement element : elements) {
379       if (MODE_BULK.equals(mode)
380               || (MODE_MIXED.equals(mode) && (element.getElementType() != MediaPackageElement.Type.Track))) {
381         bulkElementIds.add(element.getIdentifier());
382       } else {
383         singleElementIds.add(element.getIdentifier());
384       }
385     }
386 
387     Set<Job> jobs = new HashSet<>();
388     if (bulkElementIds.size() > 0) {
389       logger.info("Start bulk publishing of {} elements of media package '{}' to publication channel '{}'",
390               bulkElementIds.size(), mediapackage, channelId);
391       try {
392         Job job;
393         if (streaming) {
394           job = streamingDistributionService.distribute(channelId, mediapackage, bulkElementIds);
395         } else {
396           job = downloadDistributionService.distribute(channelId, mediapackage, bulkElementIds, checkAvailability);
397         }
398         jobs.add(job);
399       } catch (DistributionException | MediaPackageException e) {
400         logger.error("Creating the distribution job for {} elements of media package '{}' failed",
401                 bulkElementIds.size(), mediapackage, e);
402         throw new WorkflowOperationException(e);
403       }
404     }
405     if (singleElementIds.size() > 0) {
406       logger.info("Start single publishing of {} elements of media package '{}' to publication channel '{}'",
407               singleElementIds.size(), mediapackage, channelId);
408       for (String elementId : singleElementIds) {
409         try {
410           Job job;
411           if (streaming) {
412             job = streamingDistributionService.distribute(channelId, mediapackage, elementId);
413           } else {
414             job = downloadDistributionService.distribute(channelId, mediapackage, elementId, checkAvailability);
415           }
416           jobs.add(job);
417         } catch (DistributionException | MediaPackageException e) {
418           logger.error("Creating the distribution job for element '{}' of media package '{}' failed", elementId,
419                   mediapackage, e);
420           throw new WorkflowOperationException(e);
421         }
422       }
423     }
424 
425     if (jobs.size() > 0) {
426       if (!waitForStatus(jobs.toArray(new Job[jobs.size()])).isSuccess()) {
427         throw new WorkflowOperationException("At least one of the distribution jobs did not complete successfully");
428       }
429       for (Job job : jobs) {
430         try {
431           List<? extends MediaPackageElement> elems = MediaPackageElementParser.getArrayFromXml(job.getPayload());
432           result.addAll(elems);
433         } catch (MediaPackageException e) {
434           logger.error("Job '{}' returned payload ({}) that could not be parsed to media package elements", job,
435                   job.getPayload(), e);
436           throw new WorkflowOperationException(e);
437         }
438       }
439       logger.info("Published {} elements of media package {} to publication channel {}",
440               bulkElementIds.size() + singleElementIds.size(), mediapackage, channelId);
441     }
442     return result;
443   }
444 
445   /**
446    * Dummy function for further publication strategies
447    *
448    * @param mp
449    * @throws WorkflowOperationException
450    */
451   private void fail(MediaPackage mp) throws WorkflowOperationException {
452     logger.error("There is already a Published Media, fail Stragy for Mediapackage {}", mp.getIdentifier());
453     throw new WorkflowOperationException("There is already a Published Media, fail Stragy for Mediapackage ");
454   }
455 
456   @Reference
457   @Override
458   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
459     super.setServiceRegistry(serviceRegistry);
460   }
461 
462 }