1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.workflow.handler.distribution;
22
23 import static com.entwinemedia.fn.Stream.$;
24 import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.ofChannel;
25 import static org.opencastproject.util.data.Collections.list;
26 import static org.opencastproject.util.data.Option.option;
27 import static org.opencastproject.util.data.functions.Strings.toBool;
28 import static org.opencastproject.util.data.functions.Strings.trimToNone;
29
30 import org.opencastproject.distribution.api.StreamingDistributionService;
31 import org.opencastproject.job.api.Job;
32 import org.opencastproject.job.api.JobContext;
33 import org.opencastproject.mediapackage.MediaPackage;
34 import org.opencastproject.mediapackage.MediaPackageElement;
35 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
36 import org.opencastproject.mediapackage.MediaPackageElementParser;
37 import org.opencastproject.mediapackage.MediaPackageException;
38 import org.opencastproject.mediapackage.Publication;
39 import org.opencastproject.mediapackage.PublicationImpl;
40 import org.opencastproject.mediapackage.selector.SimpleElementSelector;
41 import org.opencastproject.publication.api.OaiPmhPublicationService;
42 import org.opencastproject.publication.api.PublicationException;
43 import org.opencastproject.serviceregistry.api.ServiceRegistry;
44 import org.opencastproject.util.MimeType;
45 import org.opencastproject.util.MimeTypes;
46 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
47 import org.opencastproject.workflow.api.WorkflowInstance;
48 import org.opencastproject.workflow.api.WorkflowOperationException;
49 import org.opencastproject.workflow.api.WorkflowOperationHandler;
50 import org.opencastproject.workflow.api.WorkflowOperationResult;
51 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
52
53 import com.entwinemedia.fn.data.Opt;
54
55 import org.apache.commons.lang3.StringUtils;
56 import org.osgi.service.component.ComponentContext;
57 import org.osgi.service.component.annotations.Activate;
58 import org.osgi.service.component.annotations.Component;
59 import org.osgi.service.component.annotations.Reference;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 import java.net.URI;
64 import java.util.Collection;
65 import java.util.HashSet;
66 import java.util.Set;
67 import java.util.UUID;
68
69
70
71
72 @Component(
73 immediate = true,
74 service = WorkflowOperationHandler.class,
75 property = {
76 "service.description=OAI-PMH Publication Workflow Handler",
77 "workflow.operation=publish-oaipmh"
78 }
79 )
80 public class PublishOaiPmhWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
81
82
83 private static final Logger logger = LoggerFactory.getLogger(PublishOaiPmhWorkflowOperationHandler.class);
84
85
86 private static final String DOWNLOAD_FLAVORS = "download-flavors";
87 private static final String DOWNLOAD_TAGS = "download-tags";
88 private static final String STREAMING_TAGS = "streaming-tags";
89 private static final String STREAMING_FLAVORS = "streaming-flavors";
90 private static final String CHECK_AVAILABILITY = "check-availability";
91 private static final String REPOSITORY = "repository";
92 private static final String EXTERNAL_TEMPLATE = "external-template";
93 private static final String EXTERNAL_CHANNEL_NAME = "external-channel";
94 private static final String EXTERNAL_MIME_TYPE = "external-mime-type";
95
96
97 private OaiPmhPublicationService publicationService = null;
98
99
100 private StreamingDistributionService streamingDistributionService = null;
101
102
103
104
105
106
107
108 @Reference
109 public void setPublicationService(OaiPmhPublicationService publicationService) {
110 this.publicationService = publicationService;
111 }
112
113
114
115
116
117
118
119 @Reference(target = "(distribution.channel=streaming)")
120 protected void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
121 this.streamingDistributionService = streamingDistributionService;
122 }
123
124 @Reference
125 @Override
126 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
127 super.setServiceRegistry(serviceRegistry);
128 }
129
130
131 @Override
132 @Activate
133 public void activate(ComponentContext cc) {
134 }
135
136 @Override
137 public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
138 throws WorkflowOperationException {
139 logger.debug("Running distribution workflow operation");
140
141 MediaPackage mediaPackage = workflowInstance.getMediaPackage();
142
143
144 String downloadTags = StringUtils
145 .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(DOWNLOAD_TAGS));
146 String downloadFlavors = StringUtils
147 .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(DOWNLOAD_FLAVORS));
148 String streamingTags = StringUtils
149 .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(STREAMING_TAGS));
150 String streamingFlavors = StringUtils
151 .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(STREAMING_FLAVORS));
152 boolean checkAvailability = option(workflowInstance.getCurrentOperation().getConfiguration(CHECK_AVAILABILITY))
153 .bind(trimToNone).map(toBool).getOrElse(true);
154 String repository = StringUtils.trimToNull(workflowInstance.getCurrentOperation().getConfiguration(REPOSITORY));
155
156 Opt<String> externalChannel = getOptConfig(workflowInstance.getCurrentOperation(), EXTERNAL_CHANNEL_NAME);
157 Opt<String> externalTempalte = getOptConfig(workflowInstance.getCurrentOperation(), EXTERNAL_TEMPLATE);
158 Opt<MimeType> externalMimetype = getOptConfig(workflowInstance.getCurrentOperation(), EXTERNAL_MIME_TYPE)
159 .bind(MimeTypes.toMimeType);
160
161 if (repository == null) {
162 throw new IllegalArgumentException("No repository has been specified");
163 }
164
165 String[] sourceDownloadTags = StringUtils.split(downloadTags, ",");
166 String[] sourceDownloadFlavors = StringUtils.split(downloadFlavors, ",");
167 String[] sourceStreamingTags = StringUtils.split(streamingTags, ",");
168 String[] sourceStreamingFlavors = StringUtils.split(streamingFlavors, ",");
169
170 if (sourceDownloadTags.length == 0 && sourceDownloadFlavors.length == 0 && sourceStreamingTags.length == 0
171 && sourceStreamingFlavors.length == 0) {
172 logger.warn("No tags or flavors have been specified, so nothing will be published to the engage");
173 return createResult(mediaPackage, Action.CONTINUE);
174 }
175
176 final SimpleElementSelector downloadElementSelector = new SimpleElementSelector();
177 for (String flavor : sourceDownloadFlavors) {
178 downloadElementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
179 }
180 for (String tag : sourceDownloadTags) {
181 downloadElementSelector.addTag(tag);
182 }
183 final Collection<MediaPackageElement> downloadElements = downloadElementSelector.select(mediaPackage, false);
184
185 final Collection<MediaPackageElement> streamingElements;
186 if (streamingDistributionService != null && streamingDistributionService.publishToStreaming()) {
187 final SimpleElementSelector streamingElementSelector = new SimpleElementSelector();
188 for (String flavor : sourceStreamingFlavors) {
189 streamingElementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
190 }
191 for (String tag : sourceStreamingTags) {
192 streamingElementSelector.addTag(tag);
193 }
194 streamingElements = streamingElementSelector.select(mediaPackage, false);
195 } else {
196 streamingElements = list();
197 }
198
199 try {
200 Set<String> downloadElementIds = new HashSet<>();
201 Set<String> streamingElementIds = new HashSet<>();
202
203
204 for (MediaPackageElement elem : downloadElements) {
205 downloadElementIds.add(elem.getIdentifier());
206 }
207 for (MediaPackageElement elem : streamingElements) {
208 streamingElementIds.add(elem.getIdentifier());
209 }
210
211 Job publishJob = null;
212 try {
213 publishJob = publicationService.publish(mediaPackage, repository, downloadElementIds, streamingElementIds,
214 checkAvailability);
215 } catch (MediaPackageException e) {
216 throw new WorkflowOperationException("Error parsing media package", e);
217 } catch (PublicationException e) {
218 throw new WorkflowOperationException("Error parsing media package", e);
219 }
220
221
222 if (!waitForStatus(publishJob).isSuccess()) {
223 throw new WorkflowOperationException("Mediapackage " + mediaPackage.getIdentifier()
224 + " could not be published to OAI-PMH repository " + repository);
225 }
226
227
228 Job job = serviceRegistry.getJob(publishJob.getId());
229
230
231 if (job.getPayload() == null) {
232 logger.warn("Publish to OAI-PMH repository '{}' failed, no payload from publication job: {}", repository, job);
233 return createResult(mediaPackage, Action.CONTINUE);
234 }
235
236 Publication newElement = null;
237 try {
238 newElement = (Publication) MediaPackageElementParser.getFromXml(job.getPayload());
239 } catch (MediaPackageException e) {
240 throw new WorkflowOperationException(e);
241 }
242
243 if (newElement == null) {
244 logger.warn(
245 "Publication to OAI-PMH repository '{}' failed, unable to parse the payload '{}' from "
246 + "job '{}' to a mediapackage element",
247 repository, job.getPayload(), job.toString());
248 return createResult(mediaPackage, Action.CONTINUE);
249 }
250
251 for (Publication existingPublication : $(mediaPackage.getPublications())
252 .find(ofChannel(newElement.getChannel()).toFn())) {
253 mediaPackage.remove(existingPublication);
254 }
255 mediaPackage.add(newElement);
256
257 if (externalChannel.isSome() && externalMimetype.isSome() && externalTempalte.isSome()) {
258 String template = externalTempalte.get().replace("{event}", mediaPackage.getIdentifier().toString());
259 if (StringUtils.isNotBlank(mediaPackage.getSeries())) {
260 template = template.replace("{series}", mediaPackage.getSeries());
261 }
262
263 Publication externalElement = PublicationImpl.publication(UUID.randomUUID().toString(), externalChannel.get(),
264 URI.create(template), externalMimetype.get());
265 for (Publication existingPublication : $(mediaPackage.getPublications())
266 .find(ofChannel(externalChannel.get()).toFn())) {
267 mediaPackage.remove(existingPublication);
268 }
269 mediaPackage.add(externalElement);
270 }
271
272 logger.debug("Publication to OAI-PMH repository '{}' operation completed", repository);
273 } catch (Exception e) {
274 if (e instanceof WorkflowOperationException) {
275 throw (WorkflowOperationException) e;
276 } else {
277 throw new WorkflowOperationException(e);
278 }
279 }
280 return createResult(mediaPackage, Action.CONTINUE);
281 }
282
283 }