1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.workflow.handler.distribution;
22
23 import org.opencastproject.distribution.api.DistributionException;
24 import org.opencastproject.distribution.api.DownloadDistributionService;
25 import org.opencastproject.distribution.api.StreamingDistributionService;
26 import org.opencastproject.job.api.Job;
27 import org.opencastproject.job.api.JobContext;
28 import org.opencastproject.mediapackage.MediaPackage;
29 import org.opencastproject.mediapackage.MediaPackageElement;
30 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
31 import org.opencastproject.mediapackage.MediaPackageElementParser;
32 import org.opencastproject.mediapackage.MediaPackageException;
33 import org.opencastproject.mediapackage.Publication;
34 import org.opencastproject.mediapackage.PublicationImpl;
35 import org.opencastproject.mediapackage.selector.SimpleElementSelector;
36 import org.opencastproject.security.api.SecurityService;
37 import org.opencastproject.serviceregistry.api.ServiceRegistry;
38 import org.opencastproject.util.MimeType;
39 import org.opencastproject.util.MimeTypes;
40 import org.opencastproject.util.RequireUtil;
41 import org.opencastproject.util.doc.DocUtil;
42 import org.opencastproject.workflow.api.WorkflowInstance;
43 import org.opencastproject.workflow.api.WorkflowOperationException;
44 import org.opencastproject.workflow.api.WorkflowOperationHandler;
45 import org.opencastproject.workflow.api.WorkflowOperationInstance;
46 import org.opencastproject.workflow.api.WorkflowOperationResult;
47 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
48
49 import org.apache.commons.lang3.ArrayUtils;
50 import org.apache.commons.lang3.BooleanUtils;
51 import org.apache.commons.lang3.StringUtils;
52 import org.osgi.service.component.annotations.Component;
53 import org.osgi.service.component.annotations.Reference;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 import java.net.URI;
58 import java.net.URISyntaxException;
59 import java.util.ArrayList;
60 import java.util.Arrays;
61 import java.util.Collection;
62 import java.util.HashMap;
63 import java.util.HashSet;
64 import java.util.List;
65 import java.util.Map;
66 import java.util.Set;
67 import java.util.UUID;
68
69
70
71
72
73
74 @Component(
75 immediate = true,
76 service = WorkflowOperationHandler.class,
77 property = {
78 "service.description=Configurable Publication Workflow Handler",
79 "workflow.operation=publish-configure"
80 }
81 )
82 public class ConfigurablePublishWorkflowOperationHandler extends ConfigurableWorkflowOperationHandlerBase {
83
84
85 private static final Logger logger = LoggerFactory.getLogger(ConfigurablePublishWorkflowOperationHandler.class);
86
87
88 protected static final String EVENT_ID_TEMPLATE_KEY = "event_id";
89
90 protected static final String PLAYER_PATH_TEMPLATE_KEY = "player_path";
91
92 protected static final String PUBLICATION_ID_TEMPLATE_KEY = "publication_id";
93
94 protected static final String SERIES_ID_TEMPLATE_KEY = "series_id";
95
96 protected static final String PLAYER_PROPERTY = "player";
97
98 protected static final String ORG_TEMPLATE_KEY_PREFIX = "org_";
99
100
101 private DownloadDistributionService downloadDistributionService;
102 private StreamingDistributionService streamingDistributionService;
103 private SecurityService securityService;
104
105
106 static final String DOWNLOAD_SOURCE_FLAVORS = "download-source-flavors";
107 static final String DOWNLOAD_SOURCE_TAGS = "download-source-tags";
108 static final String STREAMING_SOURCE_TAGS = "streaming-source-tags";
109 static final String STREAMING_SOURCE_FLAVORS = "streaming-source-flavors";
110 static final String CHANNEL_ID_KEY = "channel-id";
111 static final String MIME_TYPE = "mimetype";
112 static final String WITH_PUBLISHED_ELEMENTS = "with-published-elements";
113 static final String CHECK_AVAILABILITY = "check-availability";
114 static final String STRATEGY = "strategy";
115 static final String MODE = "mode";
116
117
118 static final String MODE_SINGLE = "single";
119 static final String MODE_MIXED = "mixed";
120 static final String MODE_BULK = "bulk";
121
122 static final String[] KNOWN_MODES = { MODE_SINGLE, MODE_MIXED, MODE_BULK };
123
124 static final String DEFAULT_MODE = MODE_BULK;
125
126
127 static final String URL_PATTERN = "url-pattern";
128
129 static final String RETRACT_STREAMING = "retract-streaming";
130 static final boolean RETRACT_STREAMING_DEFAULT = false;
131
132
133 @Reference(target = "(distribution.channel=download)")
134 void setDownloadDistributionService(DownloadDistributionService distributionService) {
135 this.downloadDistributionService = distributionService;
136 }
137
138 @Reference(target = "(distribution.channel=streaming)")
139 void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
140 this.streamingDistributionService = streamingDistributionService;
141 }
142
143
144 @Reference
145 protected void setSecurityService(SecurityService securityService) {
146 this.securityService = securityService;
147 }
148
149 @Override
150 protected DownloadDistributionService getDownloadDistributionService() {
151 assert (downloadDistributionService != null);
152 return downloadDistributionService;
153 }
154
155 @Override
156 protected StreamingDistributionService getStreamingDistributionService() {
157 assert (streamingDistributionService != null);
158 return streamingDistributionService;
159 }
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174 public URI populateUrlWithVariables(String urlPattern, MediaPackage mp, String pubUUID)
175 throws WorkflowOperationException {
176 Map<String, Object> values = new HashMap<>();
177 values.put(EVENT_ID_TEMPLATE_KEY, mp.getIdentifier().toString());
178 values.put(PUBLICATION_ID_TEMPLATE_KEY, pubUUID);
179 String playerPath = securityService.getOrganization().getProperties().get(PLAYER_PROPERTY);
180 values.put(PLAYER_PATH_TEMPLATE_KEY, playerPath);
181 values.put(SERIES_ID_TEMPLATE_KEY, StringUtils.trimToEmpty(mp.getSeries()));
182 Map<String, String> orgProperties = securityService.getOrganization().getProperties();
183 orgProperties.put("id", securityService.getOrganization().getId());
184 orgProperties.put("name", securityService.getOrganization().getName());
185 orgProperties.put("admin_role", securityService.getOrganization().getAdminRole());
186 orgProperties.put("anonymous_role", securityService.getOrganization().getAnonymousRole());
187 for (Map.Entry<String, String> orgProperty : orgProperties.entrySet()) {
188 values.put(ORG_TEMPLATE_KEY_PREFIX + orgProperty.getKey().replace('.', '_').toLowerCase(),
189 orgProperty.getValue());
190 }
191 String uriWithVariables = DocUtil.processTextTemplate("Replacing Variables in Publish URL", urlPattern, values);
192 URI publicationURI;
193 try {
194 publicationURI = new URI(uriWithVariables);
195 } catch (URISyntaxException e) {
196 throw new WorkflowOperationException(String.format(
197 "Unable to create URI from template '%s', replacement was: '%s'", urlPattern, uriWithVariables), e);
198 }
199 return publicationURI;
200 }
201
202 @Override
203 public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
204 throws WorkflowOperationException {
205 RequireUtil.notNull(workflowInstance, "workflowInstance");
206
207 final MediaPackage mp = workflowInstance.getMediaPackage();
208 final WorkflowOperationInstance op = workflowInstance.getCurrentOperation();
209
210 final String channelId = StringUtils.trimToEmpty(op.getConfiguration(CHANNEL_ID_KEY));
211 if ("".equals(channelId)) {
212 throw new WorkflowOperationException("Unable to publish this mediapackage as the configuration key "
213 + CHANNEL_ID_KEY + " is missing. Unable to determine where to publish these elements.");
214 }
215
216 final String urlPattern = StringUtils.trimToEmpty(op.getConfiguration(URL_PATTERN));
217
218 MimeType mimetype = null;
219 String mimetypeString = StringUtils.trimToEmpty(op.getConfiguration(MIME_TYPE));
220 if (!"".equals(mimetypeString)) {
221 try {
222 mimetype = MimeTypes.parseMimeType(mimetypeString);
223 } catch (IllegalArgumentException e) {
224 throw new WorkflowOperationException("Unable to parse the provided configuration for " + MIME_TYPE, e);
225 }
226 }
227
228 final boolean withPublishedElements = BooleanUtils
229 .toBoolean(StringUtils.trimToEmpty(op.getConfiguration(WITH_PUBLISHED_ELEMENTS)));
230
231 boolean checkAvailability = BooleanUtils
232 .toBoolean(StringUtils.trimToEmpty(op.getConfiguration(CHECK_AVAILABILITY)));
233
234 boolean retractStreaming = RETRACT_STREAMING_DEFAULT;
235 String retractStreamingString = workflowInstance.getConfiguration(RETRACT_STREAMING);
236 if (retractStreamingString != null) {
237 retractStreaming = BooleanUtils.toBoolean(StringUtils.trimToEmpty(retractStreamingString));
238 }
239
240 if (getPublications(mp, channelId).size() > 0) {
241 final String rePublishStrategy = StringUtils.trimToEmpty(op.getConfiguration(STRATEGY));
242
243 switch (rePublishStrategy) {
244
245 case ("fail"):
246
247 fail(mp);
248 break;
249 case ("merge"):
250
251 break;
252 default:
253 retract(mp, channelId, retractStreaming);
254 }
255 }
256
257 String mode = StringUtils.trimToEmpty(op.getConfiguration(MODE));
258 if ("".equals(mode)) {
259 mode = DEFAULT_MODE;
260 } else if (!ArrayUtils.contains(KNOWN_MODES, mode)) {
261 logger.error("Unknown value for configuration key mode: '{}'", mode);
262 throw new IllegalArgumentException("Unknown value for configuration key mode");
263 }
264
265 final String[] downloadSourceFlavors
266 = StringUtils.split(StringUtils.trimToEmpty(op.getConfiguration(DOWNLOAD_SOURCE_FLAVORS)), ",");
267 final String[] downloadSourceTags
268 = StringUtils.split(StringUtils.trimToEmpty(op.getConfiguration(DOWNLOAD_SOURCE_TAGS)), ",");
269 final String[] streamingSourceFlavors
270 = StringUtils.split(StringUtils.trimToEmpty(op.getConfiguration(STREAMING_SOURCE_FLAVORS)), ",");
271 final String[] streamingSourceTags
272 = StringUtils.split(StringUtils.trimToEmpty(op.getConfiguration(STREAMING_SOURCE_TAGS)), ",");
273
274 String publicationUUID = UUID.randomUUID().toString();
275 Publication publication = PublicationImpl.publication(publicationUUID, channelId, null, null);
276
277
278 final SimpleElementSelector downloadSelector = new SimpleElementSelector();
279 for (String flavor : downloadSourceFlavors) {
280 downloadSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
281 }
282 for (String tag : downloadSourceTags) {
283 downloadSelector.addTag(tag);
284 }
285
286 final SimpleElementSelector streamingSelector = new SimpleElementSelector();
287 for (String flavor : streamingSourceFlavors) {
288 streamingSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
289 }
290 for (String tag : streamingSourceTags) {
291 streamingSelector.addTag(tag);
292 }
293
294 boolean streamingElementsDistributed = false;
295 boolean downloadElementsDistributed = false;
296
297 if (streamingDistributionService != null && streamingDistributionService.publishToStreaming()
298 && (streamingSourceFlavors.length > 0 || streamingSourceTags.length > 0)) {
299 streamingElementsDistributed = distributeElements(streamingSelector, mp, publication, channelId, mode,
300 withPublishedElements, checkAvailability, true);
301 }
302
303 if (downloadSourceFlavors.length > 0 || downloadSourceTags.length > 0) {
304 downloadElementsDistributed = distributeElements(downloadSelector, mp, publication, channelId, mode,
305 withPublishedElements, checkAvailability, false);
306 }
307
308 if (!downloadElementsDistributed && !streamingElementsDistributed
309 && (downloadSourceFlavors.length > 0 || downloadSourceTags.length > 0
310 || streamingSourceFlavors.length > 0 || streamingSourceTags.length > 0)) {
311
312 return createResult(mp, Action.SKIP);
313 }
314
315 if (!"".equals(urlPattern)) {
316 publication.setURI(populateUrlWithVariables(urlPattern, mp, publicationUUID));
317 }
318 if (mimetype != null) {
319 publication.setMimeType(mimetype);
320 }
321 mp.add(publication);
322 return createResult(mp, Action.CONTINUE);
323 }
324
325 private boolean distributeElements(SimpleElementSelector selector, MediaPackage mp, Publication publication,
326 String channelId, String mode, boolean withPublishedElements, boolean checkAvailability, boolean streaming)
327 throws WorkflowOperationException {
328
329 String target = (streaming ? "streaming" : "download");
330 if (!withPublishedElements) {
331 Set<MediaPackageElement> elements = distribute(selector.select(mp, false), mp, channelId, mode,
332 checkAvailability, streaming);
333 if (elements.size() > 0) {
334 for (MediaPackageElement element : elements) {
335
336 element.setIdentifier(null);
337 PublicationImpl.addElementToPublication(publication, element);
338 }
339 } else {
340 logger.info("No element found for distribution to " + target + " in media package '{}'", mp);
341 return false;
342 }
343 } else {
344 List<MediaPackageElement> publishedElements = new ArrayList<>();
345 for (Publication alreadyPublished : mp.getPublications()) {
346 publishedElements.addAll(Arrays.asList(alreadyPublished.getAttachments()));
347 publishedElements.addAll(Arrays.asList(alreadyPublished.getCatalogs()));
348 publishedElements.addAll(Arrays.asList(alreadyPublished.getTracks()));
349 }
350
351 Collection<MediaPackageElement> elements = selector.select(publishedElements, false);
352 if (elements.size() > 0) {
353 for (MediaPackageElement element : elements) {
354 PublicationImpl.addElementToPublication(publication, element);
355 }
356 } else {
357 logger.info("No elements found for publication to " + target + " in media package '{}'", mp);
358 return false;
359 }
360 }
361 return true;
362 }
363
364 private Set<MediaPackageElement> distribute(
365 Collection<MediaPackageElement> elements,
366 MediaPackage mediapackage,
367 String channelId,
368 String mode,
369 boolean checkAvailability,
370 boolean streaming
371 ) throws WorkflowOperationException {
372
373 Set<MediaPackageElement> result = new HashSet<>();
374
375 Set<String> bulkElementIds = new HashSet<>();
376 Set<String> singleElementIds = new HashSet<>();
377
378 for (MediaPackageElement element : elements) {
379 if (MODE_BULK.equals(mode)
380 || (MODE_MIXED.equals(mode) && (element.getElementType() != MediaPackageElement.Type.Track))) {
381 bulkElementIds.add(element.getIdentifier());
382 } else {
383 singleElementIds.add(element.getIdentifier());
384 }
385 }
386
387 Set<Job> jobs = new HashSet<>();
388 if (bulkElementIds.size() > 0) {
389 logger.info("Start bulk publishing of {} elements of media package '{}' to publication channel '{}'",
390 bulkElementIds.size(), mediapackage, channelId);
391 try {
392 Job job;
393 if (streaming) {
394 job = streamingDistributionService.distribute(channelId, mediapackage, bulkElementIds);
395 } else {
396 job = downloadDistributionService.distribute(channelId, mediapackage, bulkElementIds, checkAvailability);
397 }
398 jobs.add(job);
399 } catch (DistributionException | MediaPackageException e) {
400 logger.error("Creating the distribution job for {} elements of media package '{}' failed",
401 bulkElementIds.size(), mediapackage, e);
402 throw new WorkflowOperationException(e);
403 }
404 }
405 if (singleElementIds.size() > 0) {
406 logger.info("Start single publishing of {} elements of media package '{}' to publication channel '{}'",
407 singleElementIds.size(), mediapackage, channelId);
408 for (String elementId : singleElementIds) {
409 try {
410 Job job;
411 if (streaming) {
412 job = streamingDistributionService.distribute(channelId, mediapackage, elementId);
413 } else {
414 job = downloadDistributionService.distribute(channelId, mediapackage, elementId, checkAvailability);
415 }
416 jobs.add(job);
417 } catch (DistributionException | MediaPackageException e) {
418 logger.error("Creating the distribution job for element '{}' of media package '{}' failed", elementId,
419 mediapackage, e);
420 throw new WorkflowOperationException(e);
421 }
422 }
423 }
424
425 if (jobs.size() > 0) {
426 if (!waitForStatus(jobs.toArray(new Job[jobs.size()])).isSuccess()) {
427 throw new WorkflowOperationException("At least one of the distribution jobs did not complete successfully");
428 }
429 for (Job job : jobs) {
430 try {
431 List<? extends MediaPackageElement> elems = MediaPackageElementParser.getArrayFromXml(job.getPayload());
432 result.addAll(elems);
433 } catch (MediaPackageException e) {
434 logger.error("Job '{}' returned payload ({}) that could not be parsed to media package elements", job,
435 job.getPayload(), e);
436 throw new WorkflowOperationException(e);
437 }
438 }
439 logger.info("Published {} elements of media package {} to publication channel {}",
440 bulkElementIds.size() + singleElementIds.size(), mediapackage, channelId);
441 }
442 return result;
443 }
444
445
446
447
448
449
450
451 private void fail(MediaPackage mp) throws WorkflowOperationException {
452 logger.error("There is already a Published Media, fail Stragy for Mediapackage {}", mp.getIdentifier());
453 throw new WorkflowOperationException("There is already a Published Media, fail Stragy for Mediapackage ");
454 }
455
456 @Reference
457 @Override
458 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
459 super.setServiceRegistry(serviceRegistry);
460 }
461
462 }