View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.workflow.handler.distribution;
22  
23  import static com.entwinemedia.fn.Stream.$;
24  import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.ofChannel;
25  import static org.opencastproject.util.data.Collections.list;
26  import static org.opencastproject.util.data.Option.option;
27  import static org.opencastproject.util.data.functions.Strings.toBool;
28  import static org.opencastproject.util.data.functions.Strings.trimToNone;
29  
30  import org.opencastproject.distribution.api.StreamingDistributionService;
31  import org.opencastproject.job.api.Job;
32  import org.opencastproject.job.api.JobContext;
33  import org.opencastproject.mediapackage.MediaPackage;
34  import org.opencastproject.mediapackage.MediaPackageElement;
35  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
36  import org.opencastproject.mediapackage.MediaPackageElementParser;
37  import org.opencastproject.mediapackage.MediaPackageException;
38  import org.opencastproject.mediapackage.Publication;
39  import org.opencastproject.mediapackage.PublicationImpl;
40  import org.opencastproject.mediapackage.selector.SimpleElementSelector;
41  import org.opencastproject.publication.api.OaiPmhPublicationService;
42  import org.opencastproject.publication.api.PublicationException;
43  import org.opencastproject.serviceregistry.api.ServiceRegistry;
44  import org.opencastproject.util.MimeType;
45  import org.opencastproject.util.MimeTypes;
46  import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
47  import org.opencastproject.workflow.api.WorkflowInstance;
48  import org.opencastproject.workflow.api.WorkflowOperationException;
49  import org.opencastproject.workflow.api.WorkflowOperationHandler;
50  import org.opencastproject.workflow.api.WorkflowOperationResult;
51  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
52  
53  import com.entwinemedia.fn.data.Opt;
54  
55  import org.apache.commons.lang3.StringUtils;
56  import org.osgi.service.component.ComponentContext;
57  import org.osgi.service.component.annotations.Activate;
58  import org.osgi.service.component.annotations.Component;
59  import org.osgi.service.component.annotations.Reference;
60  import org.slf4j.Logger;
61  import org.slf4j.LoggerFactory;
62  
63  import java.net.URI;
64  import java.util.Collection;
65  import java.util.HashSet;
66  import java.util.Set;
67  import java.util.UUID;
68  
69  /**
70   * The workflow definition for handling "publish" operations
71   */
72  @Component(
73      immediate = true,
74      service = WorkflowOperationHandler.class,
75      property = {
76          "service.description=OAI-PMH Publication Workflow Handler",
77          "workflow.operation=publish-oaipmh"
78      }
79  )
80  public class PublishOaiPmhWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
81  
82    /** The logging facility */
83    private static final Logger logger = LoggerFactory.getLogger(PublishOaiPmhWorkflowOperationHandler.class);
84  
85    /** Workflow configuration option keys */
86    private static final String DOWNLOAD_FLAVORS = "download-flavors";
87    private static final String DOWNLOAD_TAGS = "download-tags";
88    private static final String STREAMING_TAGS = "streaming-tags";
89    private static final String STREAMING_FLAVORS = "streaming-flavors";
90    private static final String CHECK_AVAILABILITY = "check-availability";
91    private static final String REPOSITORY = "repository";
92    private static final String EXTERNAL_TEMPLATE = "external-template";
93    private static final String EXTERNAL_CHANNEL_NAME = "external-channel";
94    private static final String EXTERNAL_MIME_TYPE = "external-mime-type";
95  
96    /** The publication service */
97    private OaiPmhPublicationService publicationService = null;
98  
99    /** The streaming distribution service */
100   private StreamingDistributionService streamingDistributionService = null;
101 
102   /**
103    * Callback for the OSGi declarative services configuration.
104    *
105    * @param publicationService
106    *          the publication service
107    */
108   @Reference
109   public void setPublicationService(OaiPmhPublicationService publicationService) {
110     this.publicationService = publicationService;
111   }
112 
113   /**
114    * Callback for the OSGi declarative services configuration.
115    *
116    * @param streamingDistributionService
117    *          the streaming distribution service
118    */
119   @Reference(target = "(distribution.channel=streaming)")
120   protected void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
121     this.streamingDistributionService = streamingDistributionService;
122   }
123 
124   @Reference
125   @Override
126   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
127     super.setServiceRegistry(serviceRegistry);
128   }
129 
130   /** OSGi component activation. */
131   @Override
132   @Activate
133   public void activate(ComponentContext cc) {
134   }
135 
136   @Override
137   public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
138           throws WorkflowOperationException {
139     logger.debug("Running distribution workflow operation");
140 
141     MediaPackage mediaPackage = workflowInstance.getMediaPackage();
142 
143     // Check which tags have been configured
144     String downloadTags = StringUtils
145             .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(DOWNLOAD_TAGS));
146     String downloadFlavors = StringUtils
147             .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(DOWNLOAD_FLAVORS));
148     String streamingTags = StringUtils
149             .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(STREAMING_TAGS));
150     String streamingFlavors = StringUtils
151             .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(STREAMING_FLAVORS));
152     boolean checkAvailability = option(workflowInstance.getCurrentOperation().getConfiguration(CHECK_AVAILABILITY))
153             .bind(trimToNone).map(toBool).getOrElse(true);
154     String repository = StringUtils.trimToNull(workflowInstance.getCurrentOperation().getConfiguration(REPOSITORY));
155 
156     Opt<String> externalChannel = getOptConfig(workflowInstance.getCurrentOperation(), EXTERNAL_CHANNEL_NAME);
157     Opt<String> externalTempalte = getOptConfig(workflowInstance.getCurrentOperation(), EXTERNAL_TEMPLATE);
158     Opt<MimeType> externalMimetype = getOptConfig(workflowInstance.getCurrentOperation(), EXTERNAL_MIME_TYPE)
159             .bind(MimeTypes.toMimeType);
160 
161     if (repository == null) {
162       throw new IllegalArgumentException("No repository has been specified");
163     }
164 
165     String[] sourceDownloadTags = StringUtils.split(downloadTags, ",");
166     String[] sourceDownloadFlavors = StringUtils.split(downloadFlavors, ",");
167     String[] sourceStreamingTags = StringUtils.split(streamingTags, ",");
168     String[] sourceStreamingFlavors = StringUtils.split(streamingFlavors, ",");
169 
170     if (sourceDownloadTags.length == 0 && sourceDownloadFlavors.length == 0 && sourceStreamingTags.length == 0
171             && sourceStreamingFlavors.length == 0) {
172       logger.warn("No tags or flavors have been specified, so nothing will be published to the engage");
173       return createResult(mediaPackage, Action.CONTINUE);
174     }
175 
176     final SimpleElementSelector downloadElementSelector = new SimpleElementSelector();
177     for (String flavor : sourceDownloadFlavors) {
178       downloadElementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
179     }
180     for (String tag : sourceDownloadTags) {
181       downloadElementSelector.addTag(tag);
182     }
183     final Collection<MediaPackageElement> downloadElements = downloadElementSelector.select(mediaPackage, false);
184 
185     final Collection<MediaPackageElement> streamingElements;
186     if (streamingDistributionService != null && streamingDistributionService.publishToStreaming()) {
187       final SimpleElementSelector streamingElementSelector = new SimpleElementSelector();
188       for (String flavor : sourceStreamingFlavors) {
189         streamingElementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
190       }
191       for (String tag : sourceStreamingTags) {
192         streamingElementSelector.addTag(tag);
193       }
194       streamingElements = streamingElementSelector.select(mediaPackage, false);
195     } else {
196       streamingElements = list();
197     }
198 
199     try {
200       Set<String> downloadElementIds = new HashSet<>();
201       Set<String> streamingElementIds = new HashSet<>();
202 
203       // Look for elements matching the tag
204       for (MediaPackageElement elem : downloadElements) {
205         downloadElementIds.add(elem.getIdentifier());
206       }
207       for (MediaPackageElement elem : streamingElements) {
208         streamingElementIds.add(elem.getIdentifier());
209       }
210 
211       Job publishJob = null;
212       try {
213         publishJob = publicationService.publish(mediaPackage, repository, downloadElementIds, streamingElementIds,
214                 checkAvailability);
215       } catch (MediaPackageException e) {
216         throw new WorkflowOperationException("Error parsing media package", e);
217       } catch (PublicationException e) {
218         throw new WorkflowOperationException("Error parsing media package", e);
219       }
220 
221       // Wait until the publication job has returned
222       if (!waitForStatus(publishJob).isSuccess()) {
223         throw new WorkflowOperationException("Mediapackage " + mediaPackage.getIdentifier()
224                 + " could not be published to OAI-PMH repository " + repository);
225       }
226 
227       // The job has passed
228       Job job = serviceRegistry.getJob(publishJob.getId());
229 
230       // If there is no payload, then the item has not been published.
231       if (job.getPayload() == null) {
232         logger.warn("Publish to OAI-PMH repository '{}' failed, no payload from publication job: {}", repository, job);
233         return createResult(mediaPackage, Action.CONTINUE);
234       }
235 
236       Publication newElement = null;
237       try {
238         newElement = (Publication) MediaPackageElementParser.getFromXml(job.getPayload());
239       } catch (MediaPackageException e) {
240         throw new WorkflowOperationException(e);
241       }
242 
243       if (newElement == null) {
244         logger.warn(
245             "Publication to OAI-PMH repository '{}' failed, unable to parse the payload '{}' from "
246                 + "job '{}' to a mediapackage element",
247             repository, job.getPayload(), job.toString());
248         return createResult(mediaPackage, Action.CONTINUE);
249       }
250 
251       for (Publication existingPublication : $(mediaPackage.getPublications())
252               .find(ofChannel(newElement.getChannel()).toFn())) {
253         mediaPackage.remove(existingPublication);
254       }
255       mediaPackage.add(newElement);
256 
257       if (externalChannel.isSome() && externalMimetype.isSome() && externalTempalte.isSome()) {
258         String template = externalTempalte.get().replace("{event}", mediaPackage.getIdentifier().toString());
259         if (StringUtils.isNotBlank(mediaPackage.getSeries())) {
260           template = template.replace("{series}", mediaPackage.getSeries());
261         }
262 
263         Publication externalElement = PublicationImpl.publication(UUID.randomUUID().toString(), externalChannel.get(),
264                 URI.create(template), externalMimetype.get());
265         for (Publication existingPublication : $(mediaPackage.getPublications())
266                 .find(ofChannel(externalChannel.get()).toFn())) {
267           mediaPackage.remove(existingPublication);
268         }
269         mediaPackage.add(externalElement);
270       }
271 
272       logger.debug("Publication to OAI-PMH repository '{}' operation completed", repository);
273     } catch (Exception e) {
274       if (e instanceof WorkflowOperationException) {
275         throw (WorkflowOperationException) e;
276       } else {
277         throw new WorkflowOperationException(e);
278       }
279     }
280     return createResult(mediaPackage, Action.CONTINUE);
281   }
282 
283 }