View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.workflow.handler.distribution;
22  
23  import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.ofChannel;
24  import static org.opencastproject.util.data.Collections.list;
25  import static org.opencastproject.util.data.Option.option;
26  import static org.opencastproject.util.data.functions.Strings.toBool;
27  import static org.opencastproject.util.data.functions.Strings.trimToNone;
28  
29  import org.opencastproject.distribution.api.StreamingDistributionService;
30  import org.opencastproject.job.api.Job;
31  import org.opencastproject.job.api.JobContext;
32  import org.opencastproject.mediapackage.MediaPackage;
33  import org.opencastproject.mediapackage.MediaPackageElement;
34  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
35  import org.opencastproject.mediapackage.MediaPackageElementParser;
36  import org.opencastproject.mediapackage.MediaPackageException;
37  import org.opencastproject.mediapackage.Publication;
38  import org.opencastproject.mediapackage.PublicationImpl;
39  import org.opencastproject.mediapackage.selector.SimpleElementSelector;
40  import org.opencastproject.publication.api.OaiPmhPublicationService;
41  import org.opencastproject.publication.api.PublicationException;
42  import org.opencastproject.serviceregistry.api.ServiceRegistry;
43  import org.opencastproject.util.MimeType;
44  import org.opencastproject.util.MimeTypes;
45  import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
46  import org.opencastproject.workflow.api.WorkflowInstance;
47  import org.opencastproject.workflow.api.WorkflowOperationException;
48  import org.opencastproject.workflow.api.WorkflowOperationHandler;
49  import org.opencastproject.workflow.api.WorkflowOperationResult;
50  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
51  
52  import org.apache.commons.lang3.StringUtils;
53  import org.osgi.service.component.ComponentContext;
54  import org.osgi.service.component.annotations.Activate;
55  import org.osgi.service.component.annotations.Component;
56  import org.osgi.service.component.annotations.Reference;
57  import org.slf4j.Logger;
58  import org.slf4j.LoggerFactory;
59  
60  import java.net.URI;
61  import java.util.Collection;
62  import java.util.HashSet;
63  import java.util.Optional;
64  import java.util.Set;
65  import java.util.UUID;
66  
67  /**
68   * The workflow definition for handling "publish" operations
69   */
70  @Component(
71      immediate = true,
72      service = WorkflowOperationHandler.class,
73      property = {
74          "service.description=OAI-PMH Publication Workflow Handler",
75          "workflow.operation=publish-oaipmh"
76      }
77  )
78  public class PublishOaiPmhWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
79  
80    /** The logging facility */
81    private static final Logger logger = LoggerFactory.getLogger(PublishOaiPmhWorkflowOperationHandler.class);
82  
83    /** Workflow configuration option keys */
84    private static final String DOWNLOAD_FLAVORS = "download-flavors";
85    private static final String DOWNLOAD_TAGS = "download-tags";
86    private static final String STREAMING_TAGS = "streaming-tags";
87    private static final String STREAMING_FLAVORS = "streaming-flavors";
88    private static final String CHECK_AVAILABILITY = "check-availability";
89    private static final String REPOSITORY = "repository";
90    private static final String EXTERNAL_TEMPLATE = "external-template";
91    private static final String EXTERNAL_CHANNEL_NAME = "external-channel";
92    private static final String EXTERNAL_MIME_TYPE = "external-mime-type";
93  
94    /** The publication service */
95    private OaiPmhPublicationService publicationService = null;
96  
97    /** The streaming distribution service */
98    private StreamingDistributionService streamingDistributionService = null;
99  
100   /**
101    * Callback for the OSGi declarative services configuration.
102    *
103    * @param publicationService
104    *          the publication service
105    */
106   @Reference
107   public void setPublicationService(OaiPmhPublicationService publicationService) {
108     this.publicationService = publicationService;
109   }
110 
111   /**
112    * Callback for the OSGi declarative services configuration.
113    *
114    * @param streamingDistributionService
115    *          the streaming distribution service
116    */
117   @Reference(target = "(distribution.channel=streaming)")
118   protected void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
119     this.streamingDistributionService = streamingDistributionService;
120   }
121 
122   @Reference
123   @Override
124   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
125     super.setServiceRegistry(serviceRegistry);
126   }
127 
128   /** OSGi component activation. */
129   @Override
130   @Activate
131   public void activate(ComponentContext cc) {
132   }
133 
134   @Override
135   public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
136           throws WorkflowOperationException {
137     logger.debug("Running distribution workflow operation");
138 
139     MediaPackage mediaPackage = workflowInstance.getMediaPackage();
140 
141     // Check which tags have been configured
142     String downloadTags = StringUtils
143             .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(DOWNLOAD_TAGS));
144     String downloadFlavors = StringUtils
145             .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(DOWNLOAD_FLAVORS));
146     String streamingTags = StringUtils
147             .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(STREAMING_TAGS));
148     String streamingFlavors = StringUtils
149             .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(STREAMING_FLAVORS));
150     boolean checkAvailability = option(workflowInstance.getCurrentOperation().getConfiguration(CHECK_AVAILABILITY))
151             .bind(trimToNone).map(toBool).getOrElse(true);
152     String repository = StringUtils.trimToNull(workflowInstance.getCurrentOperation().getConfiguration(REPOSITORY));
153 
154     Optional<String> externalChannel = Optional.ofNullable(
155         getOptConfig(workflowInstance.getCurrentOperation(), EXTERNAL_CHANNEL_NAME).orNull());
156     Optional<String> externalTemplate = Optional.ofNullable(
157         getOptConfig(workflowInstance.getCurrentOperation(), EXTERNAL_TEMPLATE).orNull());
158     Optional<MimeType> externalMimetype = Optional.ofNullable(
159         getOptConfig(workflowInstance.getCurrentOperation(), EXTERNAL_MIME_TYPE).orNull())
160         .flatMap(MimeTypes::toMimeType);
161 
162     if (repository == null) {
163       throw new IllegalArgumentException("No repository has been specified");
164     }
165 
166     String[] sourceDownloadTags = StringUtils.split(downloadTags, ",");
167     String[] sourceDownloadFlavors = StringUtils.split(downloadFlavors, ",");
168     String[] sourceStreamingTags = StringUtils.split(streamingTags, ",");
169     String[] sourceStreamingFlavors = StringUtils.split(streamingFlavors, ",");
170 
171     if (sourceDownloadTags.length == 0 && sourceDownloadFlavors.length == 0 && sourceStreamingTags.length == 0
172             && sourceStreamingFlavors.length == 0) {
173       logger.warn("No tags or flavors have been specified, so nothing will be published to the engage");
174       return createResult(mediaPackage, Action.CONTINUE);
175     }
176 
177     final SimpleElementSelector downloadElementSelector = new SimpleElementSelector();
178     for (String flavor : sourceDownloadFlavors) {
179       downloadElementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
180     }
181     for (String tag : sourceDownloadTags) {
182       downloadElementSelector.addTag(tag);
183     }
184     final Collection<MediaPackageElement> downloadElements = downloadElementSelector.select(mediaPackage, false);
185 
186     final Collection<MediaPackageElement> streamingElements;
187     if (streamingDistributionService != null && streamingDistributionService.publishToStreaming()) {
188       final SimpleElementSelector streamingElementSelector = new SimpleElementSelector();
189       for (String flavor : sourceStreamingFlavors) {
190         streamingElementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
191       }
192       for (String tag : sourceStreamingTags) {
193         streamingElementSelector.addTag(tag);
194       }
195       streamingElements = streamingElementSelector.select(mediaPackage, false);
196     } else {
197       streamingElements = list();
198     }
199 
200     try {
201       Set<String> downloadElementIds = new HashSet<>();
202       Set<String> streamingElementIds = new HashSet<>();
203 
204       // Look for elements matching the tag
205       for (MediaPackageElement elem : downloadElements) {
206         downloadElementIds.add(elem.getIdentifier());
207       }
208       for (MediaPackageElement elem : streamingElements) {
209         streamingElementIds.add(elem.getIdentifier());
210       }
211 
212       Job publishJob = null;
213       try {
214         publishJob = publicationService.publish(mediaPackage, repository, downloadElementIds, streamingElementIds,
215                 checkAvailability);
216       } catch (MediaPackageException e) {
217         throw new WorkflowOperationException("Error parsing media package", e);
218       } catch (PublicationException e) {
219         throw new WorkflowOperationException("Error parsing media package", e);
220       }
221 
222       // Wait until the publication job has returned
223       if (!waitForStatus(publishJob).isSuccess()) {
224         throw new WorkflowOperationException("Mediapackage " + mediaPackage.getIdentifier()
225                 + " could not be published to OAI-PMH repository " + repository);
226       }
227 
228       // The job has passed
229       Job job = serviceRegistry.getJob(publishJob.getId());
230 
231       // If there is no payload, then the item has not been published.
232       if (job.getPayload() == null) {
233         logger.warn("Publish to OAI-PMH repository '{}' failed, no payload from publication job: {}", repository, job);
234         return createResult(mediaPackage, Action.CONTINUE);
235       }
236 
237       Publication newElement = null;
238       try {
239         newElement = (Publication) MediaPackageElementParser.getFromXml(job.getPayload());
240       } catch (MediaPackageException e) {
241         throw new WorkflowOperationException(e);
242       }
243 
244       if (newElement == null) {
245         logger.warn(
246             "Publication to OAI-PMH repository '{}' failed, unable to parse the payload '{}' from "
247                 + "job '{}' to a mediapackage element",
248             repository, job.getPayload(), job.toString());
249         return createResult(mediaPackage, Action.CONTINUE);
250       }
251 
252       for (Publication existingPublication : mediaPackage.getPublications()) {
253         if (ofChannel(newElement.getChannel()).apply(existingPublication)) {
254           mediaPackage.remove(existingPublication);
255         }
256       }
257       mediaPackage.add(newElement);
258 
259       if (externalChannel.isPresent() && externalMimetype.isPresent() && externalTemplate.isPresent()) {
260         String template = externalTemplate.get().replace("{event}", mediaPackage.getIdentifier().toString());
261         if (StringUtils.isNotBlank(mediaPackage.getSeries())) {
262           template = template.replace("{series}", mediaPackage.getSeries());
263         }
264 
265         Publication externalElement = PublicationImpl.publication(UUID.randomUUID().toString(), externalChannel.get(),
266                 URI.create(template), externalMimetype.get());
267         for (Publication existingPublication : mediaPackage.getPublications()) {
268           if (ofChannel(externalChannel.get()).apply(existingPublication)) {
269             mediaPackage.remove(existingPublication);
270           }
271         }
272         mediaPackage.add(externalElement);
273       }
274 
275       logger.debug("Publication to OAI-PMH repository '{}' operation completed", repository);
276     } catch (Exception e) {
277       if (e instanceof WorkflowOperationException) {
278         throw (WorkflowOperationException) e;
279       } else {
280         throw new WorkflowOperationException(e);
281       }
282     }
283     return createResult(mediaPackage, Action.CONTINUE);
284   }
285 
286 }