1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.handler.composer;
23
24 import org.opencastproject.composer.api.ComposerService;
25 import org.opencastproject.composer.api.EncoderException;
26 import org.opencastproject.composer.api.EncodingProfile;
27 import org.opencastproject.job.api.Job;
28 import org.opencastproject.job.api.JobContext;
29 import org.opencastproject.mediapackage.AdaptivePlaylist;
30 import org.opencastproject.mediapackage.MediaPackage;
31 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
32 import org.opencastproject.mediapackage.MediaPackageElementParser;
33 import org.opencastproject.mediapackage.MediaPackageException;
34 import org.opencastproject.mediapackage.Track;
35 import org.opencastproject.mediapackage.selector.AbstractMediaPackageElementSelector;
36 import org.opencastproject.mediapackage.selector.TrackSelector;
37 import org.opencastproject.serviceregistry.api.ServiceRegistry;
38 import org.opencastproject.util.NotFoundException;
39 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
40 import org.opencastproject.workflow.api.WorkflowInstance;
41 import org.opencastproject.workflow.api.WorkflowOperationException;
42 import org.opencastproject.workflow.api.WorkflowOperationHandler;
43 import org.opencastproject.workflow.api.WorkflowOperationInstance;
44 import org.opencastproject.workflow.api.WorkflowOperationResult;
45 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
46 import org.opencastproject.workspace.api.Workspace;
47
48 import org.apache.commons.io.FilenameUtils;
49 import org.apache.commons.lang3.BooleanUtils;
50 import org.apache.commons.lang3.StringUtils;
51 import org.osgi.service.component.ComponentContext;
52 import org.osgi.service.component.annotations.Activate;
53 import org.osgi.service.component.annotations.Component;
54 import org.osgi.service.component.annotations.Reference;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 import java.io.IOException;
59 import java.util.ArrayList;
60 import java.util.Collection;
61 import java.util.HashMap;
62 import java.util.List;
63 import java.util.Map;
64 import java.util.function.Predicate;
65 import java.util.stream.Collectors;
66
67
68
69
70
71 @Component(
72 immediate = true,
73 service = WorkflowOperationHandler.class,
74 property = {
75 "service.description=MultiEncode Workflow Operation Handler",
76 "workflow.operation=multiencode"
77 }
78 )
79 public class MultiEncodeWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
80
81
82 private static final Logger logger = LoggerFactory.getLogger(MultiEncodeWorkflowOperationHandler.class);
83
84
85 static final String SEPARATOR = ";";
86
87
88 private ComposerService composerService = null;
89
90
91 private Workspace workspace = null;
92
93 @Activate
94 public void activate(ComponentContext cc) {
95 super.activate(cc);
96 }
97
98
99
100
101
102
103
104 @Reference
105 protected void setComposerService(ComposerService composerService) {
106 this.composerService = composerService;
107 }
108
109
110
111
112
113
114
115
116 @Reference
117 public void setWorkspace(Workspace workspace) {
118 this.workspace = workspace;
119 }
120
121 private Predicate<EncodingProfile> isManifestEP = p -> p.getOutputType() == EncodingProfile.MediaType.Manifest;
122
123
124
125
126
127
128
129 @Override
130 public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
131 throws WorkflowOperationException {
132 logger.debug("Running Multiencode workflow operation on workflow {}", workflowInstance.getId());
133
134 try {
135 return multiencode(workflowInstance.getMediaPackage(), workflowInstance.getCurrentOperation());
136 } catch (Exception e) {
137 throw new WorkflowOperationException(e);
138 }
139 }
140
141 protected class ElementProfileTagFlavor {
142 private AbstractMediaPackageElementSelector<Track> elementSelector = new TrackSelector();
143 private String targetFlavor = null;
144 private String targetTags = null;
145 private List<String> encodingProfiles = new ArrayList<>();
146 private List<EncodingProfile> encodingProfileList = new ArrayList<>();
147
148 ElementProfileTagFlavor(String profiles) {
149 List<String> profilelist = asList(profiles);
150 for (String profile : profilelist) {
151 EncodingProfile encodingprofile = composerService.getProfile(profile);
152 if (encodingprofile != null) {
153 encodingProfiles.add(encodingprofile.getIdentifier());
154 encodingProfileList.add(encodingprofile);
155 } else {
156 throw new IllegalArgumentException("Encoding profile " + profile + " not found.");
157 }
158 }
159 }
160
161 public AbstractMediaPackageElementSelector<Track> getSelector() {
162 return this.elementSelector;
163 }
164
165 public List<String> getProfiles() {
166 return this.encodingProfiles;
167 }
168
169 public List<EncodingProfile> getEncodingProfiles() {
170 return this.encodingProfileList;
171 }
172
173 void addSourceFlavor(String flavor) {
174 this.elementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
175 }
176
177 void addSourceTag(String tag) {
178 this.elementSelector.addTag(tag);
179 }
180
181 void setTargetTags(String tags) {
182 this.targetTags = tags;
183 }
184
185 void setTargetFlavor(String flavor) {
186 this.targetFlavor = flavor;
187 }
188
189 String getTargetFlavor() {
190 return this.targetFlavor;
191 }
192
193 String getTargetTags() {
194 return this.targetTags;
195 }
196 }
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214 private List<ElementProfileTagFlavor> getSrcSelector(String[] sourceFlavors, String[] sourceTags,
215 String[] targetFlavors, String[] targetTags, String[] profiles) throws WorkflowOperationException {
216 int n = 0;
217 List<ElementProfileTagFlavor> elementSelectors = new ArrayList<>();
218 if (sourceTags == null && sourceFlavors == null) {
219 throw new WorkflowOperationException("No source tags or Flavor");
220 }
221 if (profiles == null) {
222 throw new WorkflowOperationException("Missing profiles");
223 }
224 if (sourceTags != null) {
225
226 if (targetTags != null && (targetTags.length != 1 && sourceTags.length != targetTags.length)) {
227 throw new WorkflowOperationException(
228 "number of source tags " + sourceTags.length + " does not match number of target tags " + targetTags.length
229 + " (must be the same or one target)");
230 }
231
232 if (profiles.length != 1 && sourceTags.length != profiles.length) {
233 throw new WorkflowOperationException(
234 "number of source tags segments " + sourceTags.length + " does not match number of profiles segments "
235 + profiles.length + " (must be the same or one profile)");
236 }
237
238 if (sourceFlavors != null && (sourceTags.length != 1 && sourceFlavors.length != 1)
239 && sourceFlavors.length != sourceTags.length) {
240 throw new WorkflowOperationException("number of source tags segments " + sourceTags.length
241 + " does not match number of source Flavor segments " + sourceFlavors.length
242 + " (must be the same or one)");
243 }
244 n = sourceTags.length;
245 }
246 if (sourceFlavors != null) {
247
248 if (targetFlavors != null && (targetFlavors.length != 1 && sourceFlavors.length != targetFlavors.length)) {
249 throw new WorkflowOperationException(
250 "number of source flavors " + sourceFlavors.length + " segment does not match number of target flavors"
251 + targetFlavors.length + " (must be the same or one target flavor)");
252 }
253
254
255 if (targetTags != null && targetTags.length != 1 && sourceFlavors.length != targetTags.length) {
256 throw new WorkflowOperationException(
257 "number of source flavors " + sourceFlavors.length + " segment does not match number of target Tags"
258 + targetTags.length + " (must be the same or one target)");
259 }
260
261 if ((profiles.length != 1 && sourceFlavors.length != profiles.length)) {
262 throw new WorkflowOperationException("number of source flavors segments " + sourceFlavors.length
263 + " does not match number of profiles segments " + profiles.length
264 + " (must be the same or one profile)");
265 }
266 if (sourceFlavors.length > n) {
267 n = sourceFlavors.length;
268 }
269 }
270 int numProfiles = 0;
271
272 for (int i = 0; i < n; i++) {
273 elementSelectors.add(new ElementProfileTagFlavor(profiles[numProfiles]));
274 if (profiles.length > 1) {
275 numProfiles++;
276 }
277 }
278
279 if (sourceTags != null && sourceFlavors != null) {
280 if (sourceTags.length != sourceFlavors.length && sourceFlavors.length != 1 && sourceTags.length != 1) {
281 throw new WorkflowOperationException(
282 "number of source flavors " + sourceTags.length + " does not match number of source tags "
283 + sourceFlavors.length + " (must be the same or one set of tags or flavors)");
284 }
285 }
286 populateFlavorsAndTags(elementSelectors, sourceFlavors, targetFlavors, sourceTags, targetTags);
287 return elementSelectors;
288 }
289
290 private List<ElementProfileTagFlavor> populateFlavorsAndTags(List<ElementProfileTagFlavor> elementSelectors,
291 String[] sourceFlavors, String[] targetFlavors, String[] sourceTags, String[] targetTags)
292 throws WorkflowOperationException {
293 int sf = 0;
294 int tf = 0;
295 int st = 0;
296 int tt = 0;
297 for (ElementProfileTagFlavor ep : elementSelectors) {
298 try {
299 if (sourceTags != null) {
300 for (String tag : asList(sourceTags[st])) {
301 ep.addSourceTag(tag);
302 }
303 if (sourceTags.length != 1) {
304 st++;
305 }
306 }
307 if (targetTags != null) {
308 ep.setTargetTags(targetTags[tt]);
309 if (targetTags.length != 1) {
310 tt++;
311 }
312 }
313 if (sourceFlavors != null) {
314 for (String flavor : asList(sourceFlavors[sf])) {
315 ep.addSourceFlavor(flavor);
316 }
317 if (sourceFlavors.length != 1) {
318 sf++;
319 }
320 }
321 if (targetFlavors != null) {
322 for (String flavor : asList(targetFlavors[tf])) {
323 ep.setTargetFlavor(flavor);
324 }
325 if (targetFlavors.length != 1) {
326 tf++;
327 }
328 }
329 } catch (IllegalArgumentException e) {
330 throw new WorkflowOperationException("Set Tags or Flavor " + e.getMessage());
331 }
332 }
333 return elementSelectors;
334 }
335
336 private String[] getConfigAsArray(WorkflowOperationInstance operation, String name) {
337 String sourceOption = StringUtils.trimToNull(operation.getConfiguration(name));
338 return StringUtils.split(sourceOption, SEPARATOR);
339 }
340
341 private List<Track> getManifest(Collection<Track> tracks) {
342 return tracks.stream().filter(AdaptivePlaylist.isHLSTrackPred).collect(Collectors.toList());
343 }
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364 private WorkflowOperationResult multiencode(MediaPackage src, WorkflowOperationInstance operation)
365 throws EncoderException, IOException, NotFoundException, MediaPackageException, WorkflowOperationException {
366 MediaPackage mediaPackage = (MediaPackage) src.clone();
367
368 String[] sourceTags = getConfigAsArray(operation, "source-tags");
369 String[] sourceFlavors = getConfigAsArray(operation, "source-flavors");
370 String[] targetTags = getConfigAsArray(operation, "target-tags");
371 String[] targetFlavors = getConfigAsArray(operation, "target-flavors");
372 String tagWithProfileConfig = StringUtils.trimToNull(operation.getConfiguration("tag-with-profile"));
373 boolean tagWithProfile = BooleanUtils.toBoolean(tagWithProfileConfig);
374
375
376 if (sourceFlavors == null && sourceTags == null) {
377 logger.info("No source tags or flavors have been specified, not matching anything");
378 return createResult(mediaPackage, Action.CONTINUE);
379 }
380 String[] profiles = getConfigAsArray(operation, "encoding-profiles");
381 if (profiles == null) {
382 throw new WorkflowOperationException("Missing encoding profiles");
383 }
384
385
386 List<ElementProfileTagFlavor> selectors = getSrcSelector(sourceFlavors, sourceTags, targetFlavors, targetTags,
387 profiles);
388
389 long totalTimeInQueue = 0;
390 Map<Job, JobInformation> encodingJobs = new HashMap<>();
391
392 for (ElementProfileTagFlavor eptf : selectors) {
393
394 Collection<Track> elements = eptf.elementSelector.select(mediaPackage, true);
395 for (Track sourceTrack : elements) {
396 logger.info("Encoding track {} using encoding profile '{}'", sourceTrack, eptf.getProfiles().get(0).toString());
397
398 encodingJobs.put(composerService.multiEncode(sourceTrack, eptf.getProfiles()),
399 new JobInformation(sourceTrack, eptf, tagWithProfile));
400 }
401 }
402
403 if (encodingJobs.isEmpty()) {
404 logger.info("No matching tracks found");
405 return createResult(mediaPackage, Action.CONTINUE);
406 }
407
408
409 if (!waitForStatus(encodingJobs.keySet().toArray(new Job[encodingJobs.size()])).isSuccess()) {
410 throw new WorkflowOperationException("One of the encoding jobs did not complete successfully");
411 }
412
413
414 for (Map.Entry<Job, JobInformation> entry : encodingJobs.entrySet()) {
415 Job job = entry.getKey();
416 Track sourceTrack = entry.getValue().getTrack();
417 ElementProfileTagFlavor info = entry.getValue().getInfo();
418 List<EncodingProfile> eplist = entry.getValue().getProfileList();
419
420 totalTimeInQueue += job.getQueueTime();
421
422 if (job.getPayload().length() > 0) {
423 @SuppressWarnings("unchecked")
424 List<Track> composedTracks = (List<Track>) MediaPackageElementParser.getArrayFromXml(job.getPayload());
425
426 boolean isHLS = eplist.stream().anyMatch(isManifestEP);
427 if (isHLS) {
428 decipherHLSPlaylistResults(sourceTrack, entry.getValue(), mediaPackage, composedTracks);
429 } else if (composedTracks.size() != info.getProfiles().size()) {
430 logger.info("Encoded {} tracks, with {} profiles", composedTracks.size(), info.getProfiles().size());
431 throw new WorkflowOperationException("Number of output tracks does not match number of encoding profiles");
432 }
433 for (Track composedTrack : composedTracks) {
434 if (info.getTargetFlavor() != null) {
435
436 composedTrack.setFlavor(newFlavor(sourceTrack, info.getTargetFlavor()));
437 logger.debug("Composed track has flavor '{}'", composedTrack.getFlavor());
438 }
439 if (info.getTargetTags() != null) {
440 for (String tag : asList(info.getTargetTags())) {
441 logger.trace("Tagging composed track with '{}'", tag);
442 composedTrack.addTag(tag);
443 }
444 }
445
446 if (entry.getValue().getTagWithProfile()) {
447 tagByProfile(composedTrack, eplist);
448 }
449 String fileName;
450 if (!isHLS || composedTrack.isMaster()) {
451
452 fileName = getFileNameFromElements(sourceTrack, composedTrack);
453 } else {
454
455
456 fileName = FilenameUtils.getName(composedTrack.getURI().getPath());
457 }
458
459 composedTrack.setURI(workspace.moveTo(composedTrack.getURI(), mediaPackage.getIdentifier().toString(),
460 composedTrack.getIdentifier(), fileName));
461 mediaPackage.addDerived(composedTrack, sourceTrack);
462 }
463 } else {
464 logger.warn("No output from MultiEncode operation");
465 }
466 }
467 WorkflowOperationResult result = createResult(mediaPackage, Action.CONTINUE, totalTimeInQueue);
468 logger.debug("MultiEncode operation completed");
469 return result;
470 }
471
472
473
474
475
476
477
478
479
480 private void tagByProfile(Track track, List<EncodingProfile> profiles) {
481 String rawfileName = track.getURI().getRawPath();
482 for (EncodingProfile ep : profiles) {
483
484
485
486
487 String suffixToSanitize = "X" + ep.getSuffix();
488
489 String suffix = workspace.toSafeName(suffixToSanitize).substring(1);
490 if (suffix.length() > 0 && rawfileName.endsWith(suffix)) {
491 track.addTag(ep.getIdentifier());
492 return;
493 }
494 }
495 }
496
497 private void decipherHLSPlaylistResults(Track track, JobInformation jobInfo, MediaPackage mediaPackage,
498 List<Track> composedTracks)
499 throws WorkflowOperationException, IllegalArgumentException, NotFoundException, IOException {
500 int nprofiles = jobInfo.getInfo().getProfiles().size();
501 List<Track> manifests = getManifest(composedTracks);
502
503 if (manifests.size() != nprofiles) {
504 throw new WorkflowOperationException("Number of output playlists does not match number of encoding profiles");
505 }
506 if (composedTracks.size() != manifests.size() * 2 - 1) {
507 throw new WorkflowOperationException("Number of output media does not match number of encoding profiles");
508 }
509 }
510
511 private MediaPackageElementFlavor newFlavor(Track track, String flavor) throws WorkflowOperationException {
512 if (StringUtils.isNotBlank(flavor)) {
513 try {
514 MediaPackageElementFlavor targetFlavor = MediaPackageElementFlavor.parseFlavor(flavor);
515 String flavorType = targetFlavor.getType();
516 String flavorSubtype = targetFlavor.getSubtype();
517
518 if ("*".equals(flavorType)) {
519 flavorType = track.getFlavor().getType();
520 }
521 if ("*".equals(flavorSubtype)) {
522 flavorSubtype = track.getFlavor().getSubtype();
523 }
524 return (new MediaPackageElementFlavor(flavorType, flavorSubtype));
525 } catch (IllegalArgumentException e) {
526 throw new WorkflowOperationException("Target flavor '" + flavor + "' is malformed");
527 }
528 }
529 return null;
530 }
531
532
533
534
535 private static final class JobInformation {
536
537 private Track track = null;
538 private ElementProfileTagFlavor info = null;
539 private boolean tagWithProfile;
540
541 JobInformation(Track track, ElementProfileTagFlavor info, boolean tagWithProfile) {
542 this.track = track;
543 this.info = info;
544 this.tagWithProfile = tagWithProfile;
545 }
546
547 public List<EncodingProfile> getProfileList() {
548 return info.encodingProfileList;
549 }
550
551
552
553
554
555
556 public Track getTrack() {
557 return track;
558 }
559
560 public boolean getTagWithProfile() {
561 return this.tagWithProfile;
562 }
563
564 public ElementProfileTagFlavor getInfo() {
565 return info;
566 }
567 }
568
569 @Reference
570 @Override
571 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
572 super.setServiceRegistry(serviceRegistry);
573 }
574
575 }