1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.workflow.handler.distribution;
22
23 import static org.opencastproject.mediapackage.MediaPackageSupport.Filters.ofChannel;
24 import static org.opencastproject.util.data.Collections.list;
25 import static org.opencastproject.util.data.Option.option;
26 import static org.opencastproject.util.data.functions.Strings.toBool;
27 import static org.opencastproject.util.data.functions.Strings.trimToNone;
28
29 import org.opencastproject.distribution.api.StreamingDistributionService;
30 import org.opencastproject.job.api.Job;
31 import org.opencastproject.job.api.JobContext;
32 import org.opencastproject.mediapackage.MediaPackage;
33 import org.opencastproject.mediapackage.MediaPackageElement;
34 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
35 import org.opencastproject.mediapackage.MediaPackageElementParser;
36 import org.opencastproject.mediapackage.MediaPackageException;
37 import org.opencastproject.mediapackage.Publication;
38 import org.opencastproject.mediapackage.PublicationImpl;
39 import org.opencastproject.mediapackage.selector.SimpleElementSelector;
40 import org.opencastproject.publication.api.OaiPmhPublicationService;
41 import org.opencastproject.publication.api.PublicationException;
42 import org.opencastproject.serviceregistry.api.ServiceRegistry;
43 import org.opencastproject.util.MimeType;
44 import org.opencastproject.util.MimeTypes;
45 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
46 import org.opencastproject.workflow.api.WorkflowInstance;
47 import org.opencastproject.workflow.api.WorkflowOperationException;
48 import org.opencastproject.workflow.api.WorkflowOperationHandler;
49 import org.opencastproject.workflow.api.WorkflowOperationResult;
50 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
51
52 import org.apache.commons.lang3.StringUtils;
53 import org.osgi.service.component.ComponentContext;
54 import org.osgi.service.component.annotations.Activate;
55 import org.osgi.service.component.annotations.Component;
56 import org.osgi.service.component.annotations.Reference;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 import java.net.URI;
61 import java.util.Collection;
62 import java.util.HashSet;
63 import java.util.Optional;
64 import java.util.Set;
65 import java.util.UUID;
66
67
68
69
70 @Component(
71 immediate = true,
72 service = WorkflowOperationHandler.class,
73 property = {
74 "service.description=OAI-PMH Publication Workflow Handler",
75 "workflow.operation=publish-oaipmh"
76 }
77 )
78 public class PublishOaiPmhWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
79
80
81 private static final Logger logger = LoggerFactory.getLogger(PublishOaiPmhWorkflowOperationHandler.class);
82
83
84 private static final String DOWNLOAD_FLAVORS = "download-flavors";
85 private static final String DOWNLOAD_TAGS = "download-tags";
86 private static final String STREAMING_TAGS = "streaming-tags";
87 private static final String STREAMING_FLAVORS = "streaming-flavors";
88 private static final String CHECK_AVAILABILITY = "check-availability";
89 private static final String REPOSITORY = "repository";
90 private static final String EXTERNAL_TEMPLATE = "external-template";
91 private static final String EXTERNAL_CHANNEL_NAME = "external-channel";
92 private static final String EXTERNAL_MIME_TYPE = "external-mime-type";
93
94
95 private OaiPmhPublicationService publicationService = null;
96
97
98 private StreamingDistributionService streamingDistributionService = null;
99
100
101
102
103
104
105
106 @Reference
107 public void setPublicationService(OaiPmhPublicationService publicationService) {
108 this.publicationService = publicationService;
109 }
110
111
112
113
114
115
116
117 @Reference(target = "(distribution.channel=streaming)")
118 protected void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
119 this.streamingDistributionService = streamingDistributionService;
120 }
121
122 @Reference
123 @Override
124 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
125 super.setServiceRegistry(serviceRegistry);
126 }
127
128
129 @Override
130 @Activate
131 public void activate(ComponentContext cc) {
132 }
133
134 @Override
135 public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
136 throws WorkflowOperationException {
137 logger.debug("Running distribution workflow operation");
138
139 MediaPackage mediaPackage = workflowInstance.getMediaPackage();
140
141
142 String downloadTags = StringUtils
143 .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(DOWNLOAD_TAGS));
144 String downloadFlavors = StringUtils
145 .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(DOWNLOAD_FLAVORS));
146 String streamingTags = StringUtils
147 .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(STREAMING_TAGS));
148 String streamingFlavors = StringUtils
149 .trimToEmpty(workflowInstance.getCurrentOperation().getConfiguration(STREAMING_FLAVORS));
150 boolean checkAvailability = option(workflowInstance.getCurrentOperation().getConfiguration(CHECK_AVAILABILITY))
151 .bind(trimToNone).map(toBool).getOrElse(true);
152 String repository = StringUtils.trimToNull(workflowInstance.getCurrentOperation().getConfiguration(REPOSITORY));
153
154 Optional<String> externalChannel = Optional.ofNullable(
155 getOptConfig(workflowInstance.getCurrentOperation(), EXTERNAL_CHANNEL_NAME).orNull());
156 Optional<String> externalTemplate = Optional.ofNullable(
157 getOptConfig(workflowInstance.getCurrentOperation(), EXTERNAL_TEMPLATE).orNull());
158 Optional<MimeType> externalMimetype = Optional.ofNullable(
159 getOptConfig(workflowInstance.getCurrentOperation(), EXTERNAL_MIME_TYPE).orNull())
160 .flatMap(MimeTypes::toMimeType);
161
162 if (repository == null) {
163 throw new IllegalArgumentException("No repository has been specified");
164 }
165
166 String[] sourceDownloadTags = StringUtils.split(downloadTags, ",");
167 String[] sourceDownloadFlavors = StringUtils.split(downloadFlavors, ",");
168 String[] sourceStreamingTags = StringUtils.split(streamingTags, ",");
169 String[] sourceStreamingFlavors = StringUtils.split(streamingFlavors, ",");
170
171 if (sourceDownloadTags.length == 0 && sourceDownloadFlavors.length == 0 && sourceStreamingTags.length == 0
172 && sourceStreamingFlavors.length == 0) {
173 logger.warn("No tags or flavors have been specified, so nothing will be published to the engage");
174 return createResult(mediaPackage, Action.CONTINUE);
175 }
176
177 final SimpleElementSelector downloadElementSelector = new SimpleElementSelector();
178 for (String flavor : sourceDownloadFlavors) {
179 downloadElementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
180 }
181 for (String tag : sourceDownloadTags) {
182 downloadElementSelector.addTag(tag);
183 }
184 final Collection<MediaPackageElement> downloadElements = downloadElementSelector.select(mediaPackage, false);
185
186 final Collection<MediaPackageElement> streamingElements;
187 if (streamingDistributionService != null && streamingDistributionService.publishToStreaming()) {
188 final SimpleElementSelector streamingElementSelector = new SimpleElementSelector();
189 for (String flavor : sourceStreamingFlavors) {
190 streamingElementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
191 }
192 for (String tag : sourceStreamingTags) {
193 streamingElementSelector.addTag(tag);
194 }
195 streamingElements = streamingElementSelector.select(mediaPackage, false);
196 } else {
197 streamingElements = list();
198 }
199
200 try {
201 Set<String> downloadElementIds = new HashSet<>();
202 Set<String> streamingElementIds = new HashSet<>();
203
204
205 for (MediaPackageElement elem : downloadElements) {
206 downloadElementIds.add(elem.getIdentifier());
207 }
208 for (MediaPackageElement elem : streamingElements) {
209 streamingElementIds.add(elem.getIdentifier());
210 }
211
212 Job publishJob = null;
213 try {
214 publishJob = publicationService.publish(mediaPackage, repository, downloadElementIds, streamingElementIds,
215 checkAvailability);
216 } catch (MediaPackageException e) {
217 throw new WorkflowOperationException("Error parsing media package", e);
218 } catch (PublicationException e) {
219 throw new WorkflowOperationException("Error parsing media package", e);
220 }
221
222
223 if (!waitForStatus(publishJob).isSuccess()) {
224 throw new WorkflowOperationException("Mediapackage " + mediaPackage.getIdentifier()
225 + " could not be published to OAI-PMH repository " + repository);
226 }
227
228
229 Job job = serviceRegistry.getJob(publishJob.getId());
230
231
232 if (job.getPayload() == null) {
233 logger.warn("Publish to OAI-PMH repository '{}' failed, no payload from publication job: {}", repository, job);
234 return createResult(mediaPackage, Action.CONTINUE);
235 }
236
237 Publication newElement = null;
238 try {
239 newElement = (Publication) MediaPackageElementParser.getFromXml(job.getPayload());
240 } catch (MediaPackageException e) {
241 throw new WorkflowOperationException(e);
242 }
243
244 if (newElement == null) {
245 logger.warn(
246 "Publication to OAI-PMH repository '{}' failed, unable to parse the payload '{}' from "
247 + "job '{}' to a mediapackage element",
248 repository, job.getPayload(), job.toString());
249 return createResult(mediaPackage, Action.CONTINUE);
250 }
251
252 for (Publication existingPublication : mediaPackage.getPublications()) {
253 if (ofChannel(newElement.getChannel()).apply(existingPublication)) {
254 mediaPackage.remove(existingPublication);
255 }
256 }
257 mediaPackage.add(newElement);
258
259 if (externalChannel.isPresent() && externalMimetype.isPresent() && externalTemplate.isPresent()) {
260 String template = externalTemplate.get().replace("{event}", mediaPackage.getIdentifier().toString());
261 if (StringUtils.isNotBlank(mediaPackage.getSeries())) {
262 template = template.replace("{series}", mediaPackage.getSeries());
263 }
264
265 Publication externalElement = PublicationImpl.publication(UUID.randomUUID().toString(), externalChannel.get(),
266 URI.create(template), externalMimetype.get());
267 for (Publication existingPublication : mediaPackage.getPublications()) {
268 if (ofChannel(externalChannel.get()).apply(existingPublication)) {
269 mediaPackage.remove(existingPublication);
270 }
271 }
272 mediaPackage.add(externalElement);
273 }
274
275 logger.debug("Publication to OAI-PMH repository '{}' operation completed", repository);
276 } catch (Exception e) {
277 if (e instanceof WorkflowOperationException) {
278 throw (WorkflowOperationException) e;
279 } else {
280 throw new WorkflowOperationException(e);
281 }
282 }
283 return createResult(mediaPackage, Action.CONTINUE);
284 }
285
286 }