1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.handler.composer;
23
24 import org.opencastproject.composer.api.ComposerService;
25 import org.opencastproject.composer.api.EncoderException;
26 import org.opencastproject.composer.api.EncodingProfile;
27 import org.opencastproject.job.api.Job;
28 import org.opencastproject.job.api.JobContext;
29 import org.opencastproject.mediapackage.AdaptivePlaylist;
30 import org.opencastproject.mediapackage.MediaPackage;
31 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
32 import org.opencastproject.mediapackage.MediaPackageElementParser;
33 import org.opencastproject.mediapackage.MediaPackageException;
34 import org.opencastproject.mediapackage.Track;
35 import org.opencastproject.mediapackage.selector.AbstractMediaPackageElementSelector;
36 import org.opencastproject.mediapackage.selector.TrackSelector;
37 import org.opencastproject.serviceregistry.api.ServiceRegistry;
38 import org.opencastproject.util.NotFoundException;
39 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
40 import org.opencastproject.workflow.api.WorkflowInstance;
41 import org.opencastproject.workflow.api.WorkflowOperationException;
42 import org.opencastproject.workflow.api.WorkflowOperationHandler;
43 import org.opencastproject.workflow.api.WorkflowOperationInstance;
44 import org.opencastproject.workflow.api.WorkflowOperationResult;
45 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
46 import org.opencastproject.workspace.api.Workspace;
47
48 import org.apache.commons.io.FilenameUtils;
49 import org.apache.commons.lang3.BooleanUtils;
50 import org.apache.commons.lang3.StringUtils;
51 import org.osgi.service.component.ComponentContext;
52 import org.osgi.service.component.annotations.Activate;
53 import org.osgi.service.component.annotations.Component;
54 import org.osgi.service.component.annotations.Reference;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 import java.io.IOException;
59 import java.util.ArrayList;
60 import java.util.Collection;
61 import java.util.HashMap;
62 import java.util.List;
63 import java.util.Map;
64 import java.util.function.Predicate;
65 import java.util.stream.Collectors;
66
67
68
69
70
71 @Component(
72 immediate = true,
73 service = WorkflowOperationHandler.class,
74 property = {
75 "service.description=MultiEncode Workflow Operation Handler",
76 "workflow.operation=multiencode"
77 }
78 )
79 public class MultiEncodeWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
80
81
82 private static final Logger logger = LoggerFactory.getLogger(MultiEncodeWorkflowOperationHandler.class);
83
84
85 static final String SEPARATOR = ";";
86
87
88 private ComposerService composerService = null;
89
90
91 private Workspace workspace = null;
92
93 @Activate
94 public void activate(ComponentContext cc) {
95 super.activate(cc);
96 }
97
98
99
100
101
102
103
104 @Reference
105 protected void setComposerService(ComposerService composerService) {
106 this.composerService = composerService;
107 }
108
109
110
111
112
113
114
115
116 @Reference
117 public void setWorkspace(Workspace workspace) {
118 this.workspace = workspace;
119 }
120
121 private Predicate<EncodingProfile> isManifestEP = p -> p.getOutputType() == EncodingProfile.MediaType.Manifest;
122
123
124
125
126
127
128
129 @Override
130 public WorkflowOperationResult start(final WorkflowInstance workflowInstance, JobContext context)
131 throws WorkflowOperationException {
132 logger.debug("Running Multiencode workflow operation on workflow {}", workflowInstance.getId());
133
134 try {
135 return multiencode(workflowInstance.getMediaPackage(), workflowInstance.getCurrentOperation());
136 } catch (Exception e) {
137 throw new WorkflowOperationException(e);
138 }
139 }
140
141 protected class ElementProfileTagFlavor {
142 private AbstractMediaPackageElementSelector<Track> elementSelector = new TrackSelector();
143 private String targetFlavor = null;
144 private String targetTags = null;
145 private List<String> encodingProfiles = new ArrayList<>();
146 private List<EncodingProfile> encodingProfileList = new ArrayList<>();
147
148 ElementProfileTagFlavor(String profiles) {
149 List<String> profilelist = asList(profiles);
150 for (String profile : profilelist) {
151 EncodingProfile encodingprofile = composerService.getProfile(profile);
152 if (encodingprofile != null) {
153 encodingProfiles.add(encodingprofile.getIdentifier());
154 encodingProfileList.add(encodingprofile);
155 } else {
156 throw new IllegalArgumentException("Encoding profile " + profile + " not found.");
157 }
158 }
159 }
160
161 public AbstractMediaPackageElementSelector<Track> getSelector() {
162 return this.elementSelector;
163 }
164
165 public List<String> getProfiles() {
166 return this.encodingProfiles;
167 }
168
169 public List<EncodingProfile> getEncodingProfiles() {
170 return this.encodingProfileList;
171 }
172
173 void addSourceFlavor(String flavor) {
174 this.elementSelector.addFlavor(MediaPackageElementFlavor.parseFlavor(flavor));
175 }
176
177 void addSourceTag(String tag) {
178 this.elementSelector.addTag(tag);
179 }
180
181 void setTargetTags(String tags) {
182 this.targetTags = tags;
183 }
184
185 void setTargetFlavor(String flavor) {
186 this.targetFlavor = flavor;
187 }
188
189 String getTargetFlavor() {
190 return this.targetFlavor;
191 }
192
193 String getTargetTags() {
194 return this.targetTags;
195 }
196 }
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214 private List<ElementProfileTagFlavor> getSrcSelector(String[] sourceFlavors, String[] sourceTags,
215 String[] targetFlavors, String[] targetTags, String[] profiles) throws WorkflowOperationException {
216 int n = 0;
217 List<ElementProfileTagFlavor> elementSelectors = new ArrayList<>();
218 if (sourceTags == null && sourceFlavors == null)
219 throw new WorkflowOperationException("No source tags or Flavor");
220 if (profiles == null)
221 throw new WorkflowOperationException("Missing profiles");
222 if (sourceTags != null) {
223
224 if (targetTags != null && (targetTags.length != 1 && sourceTags.length != targetTags.length))
225 throw new WorkflowOperationException("number of source tags " + sourceTags.length
226 + " does not match number of target tags " + targetTags.length + " (must be the same or one target)");
227
228 if (profiles.length != 1 && sourceTags.length != profiles.length) {
229 throw new WorkflowOperationException(
230 "number of source tags segments " + sourceTags.length + " does not match number of profiles segments "
231 + profiles.length + " (must be the same or one profile)");
232 }
233
234 if (sourceFlavors != null && (sourceTags.length != 1 && sourceFlavors.length != 1)
235 && sourceFlavors.length != sourceTags.length) {
236 throw new WorkflowOperationException("number of source tags segments " + sourceTags.length
237 + " does not match number of source Flavor segments " + sourceFlavors.length
238 + " (must be the same or one)");
239 }
240 n = sourceTags.length;
241 }
242 if (sourceFlavors != null) {
243
244 if (targetFlavors != null && (targetFlavors.length != 1 && sourceFlavors.length != targetFlavors.length)) {
245 throw new WorkflowOperationException(
246 "number of source flavors " + sourceFlavors.length + " segment does not match number of target flavors"
247 + targetFlavors.length + " (must be the same or one target flavor)");
248 }
249
250
251 if (targetTags != null && targetTags.length != 1 && sourceFlavors.length != targetTags.length) {
252 throw new WorkflowOperationException(
253 "number of source flavors " + sourceFlavors.length + " segment does not match number of target Tags"
254 + targetTags.length + " (must be the same or one target)");
255 }
256
257 if ((profiles.length != 1 && sourceFlavors.length != profiles.length)) {
258 throw new WorkflowOperationException("number of source flavors segments " + sourceFlavors.length
259 + " does not match number of profiles segments " + profiles.length
260 + " (must be the same or one profile)");
261 }
262 if (sourceFlavors.length > n)
263 n = sourceFlavors.length;
264 }
265 int numProfiles = 0;
266
267 for (int i = 0; i < n; i++) {
268 elementSelectors.add(new ElementProfileTagFlavor(profiles[numProfiles]));
269 if (profiles.length > 1)
270 numProfiles++;
271 }
272
273 if (sourceTags != null && sourceFlavors != null) {
274 if (sourceTags.length != sourceFlavors.length && sourceFlavors.length != 1 && sourceTags.length != 1) {
275 throw new WorkflowOperationException(
276 "number of source flavors " + sourceTags.length + " does not match number of source tags "
277 + sourceFlavors.length + " (must be the same or one set of tags or flavors)");
278 }
279 }
280 populateFlavorsAndTags(elementSelectors, sourceFlavors, targetFlavors, sourceTags, targetTags);
281 return elementSelectors;
282 }
283
284 private List<ElementProfileTagFlavor> populateFlavorsAndTags(List<ElementProfileTagFlavor> elementSelectors,
285 String[] sourceFlavors, String[] targetFlavors, String[] sourceTags, String[] targetTags)
286 throws WorkflowOperationException {
287 int sf = 0;
288 int tf = 0;
289 int st = 0;
290 int tt = 0;
291 for (ElementProfileTagFlavor ep : elementSelectors) {
292 try {
293 if (sourceTags != null) {
294 for (String tag : asList(sourceTags[st])) {
295 ep.addSourceTag(tag);
296 }
297 if (sourceTags.length != 1)
298 st++;
299 }
300 if (targetTags != null) {
301 ep.setTargetTags(targetTags[tt]);
302 if (targetTags.length != 1)
303 tt++;
304 }
305 if (sourceFlavors != null) {
306 for (String flavor : asList(sourceFlavors[sf])) {
307 ep.addSourceFlavor(flavor);
308 }
309 if (sourceFlavors.length != 1)
310 sf++;
311 }
312 if (targetFlavors != null) {
313 for (String flavor : asList(targetFlavors[tf])) {
314 ep.setTargetFlavor(flavor);
315 }
316 if (targetFlavors.length != 1)
317 tf++;
318 }
319 } catch (IllegalArgumentException e) {
320 throw new WorkflowOperationException("Set Tags or Flavor " + e.getMessage());
321 }
322 }
323 return elementSelectors;
324 }
325
326 private String[] getConfigAsArray(WorkflowOperationInstance operation, String name) {
327 String sourceOption = StringUtils.trimToNull(operation.getConfiguration(name));
328 return StringUtils.split(sourceOption, SEPARATOR);
329 }
330
331 private List<Track> getManifest(Collection<Track> tracks) {
332 return tracks.stream().filter(AdaptivePlaylist.isHLSTrackPred).collect(Collectors.toList());
333 }
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354 private WorkflowOperationResult multiencode(MediaPackage src, WorkflowOperationInstance operation)
355 throws EncoderException, IOException, NotFoundException, MediaPackageException, WorkflowOperationException {
356 MediaPackage mediaPackage = (MediaPackage) src.clone();
357
358 String[] sourceTags = getConfigAsArray(operation, "source-tags");
359 String[] sourceFlavors = getConfigAsArray(operation, "source-flavors");
360 String[] targetTags = getConfigAsArray(operation, "target-tags");
361 String[] targetFlavors = getConfigAsArray(operation, "target-flavors");
362 String tagWithProfileConfig = StringUtils.trimToNull(operation.getConfiguration("tag-with-profile"));
363 boolean tagWithProfile = BooleanUtils.toBoolean(tagWithProfileConfig);
364
365
366 if (sourceFlavors == null && sourceTags == null) {
367 logger.info("No source tags or flavors have been specified, not matching anything");
368 return createResult(mediaPackage, Action.CONTINUE);
369 }
370 String[] profiles = getConfigAsArray(operation, "encoding-profiles");
371 if (profiles == null)
372 throw new WorkflowOperationException("Missing encoding profiles");
373
374
375 List<ElementProfileTagFlavor> selectors = getSrcSelector(sourceFlavors, sourceTags, targetFlavors, targetTags,
376 profiles);
377
378 long totalTimeInQueue = 0;
379 Map<Job, JobInformation> encodingJobs = new HashMap<>();
380
381 for (ElementProfileTagFlavor eptf : selectors) {
382
383 Collection<Track> elements = eptf.elementSelector.select(mediaPackage, true);
384 for (Track sourceTrack : elements) {
385 logger.info("Encoding track {} using encoding profile '{}'", sourceTrack, eptf.getProfiles().get(0).toString());
386
387 encodingJobs.put(composerService.multiEncode(sourceTrack, eptf.getProfiles()),
388 new JobInformation(sourceTrack, eptf, tagWithProfile));
389 }
390 }
391
392 if (encodingJobs.isEmpty()) {
393 logger.info("No matching tracks found");
394 return createResult(mediaPackage, Action.CONTINUE);
395 }
396
397
398 if (!waitForStatus(encodingJobs.keySet().toArray(new Job[encodingJobs.size()])).isSuccess()) {
399 throw new WorkflowOperationException("One of the encoding jobs did not complete successfully");
400 }
401
402
403 for (Map.Entry<Job, JobInformation> entry : encodingJobs.entrySet()) {
404 Job job = entry.getKey();
405 Track sourceTrack = entry.getValue().getTrack();
406 ElementProfileTagFlavor info = entry.getValue().getInfo();
407 List<EncodingProfile> eplist = entry.getValue().getProfileList();
408
409 totalTimeInQueue += job.getQueueTime();
410
411 if (job.getPayload().length() > 0) {
412 @SuppressWarnings("unchecked")
413 List<Track> composedTracks = (List<Track>) MediaPackageElementParser.getArrayFromXml(job.getPayload());
414
415 boolean isHLS = eplist.stream().anyMatch(isManifestEP);
416 if (isHLS) {
417 decipherHLSPlaylistResults(sourceTrack, entry.getValue(), mediaPackage, composedTracks);
418 } else if (composedTracks.size() != info.getProfiles().size()) {
419 logger.info("Encoded {} tracks, with {} profiles", composedTracks.size(), info.getProfiles().size());
420 throw new WorkflowOperationException("Number of output tracks does not match number of encoding profiles");
421 }
422 for (Track composedTrack : composedTracks) {
423 if (info.getTargetFlavor() != null) {
424
425 composedTrack.setFlavor(newFlavor(sourceTrack, info.getTargetFlavor()));
426 logger.debug("Composed track has flavor '{}'", composedTrack.getFlavor());
427 }
428 if (info.getTargetTags() != null) {
429 for (String tag : asList(info.getTargetTags())) {
430 logger.trace("Tagging composed track with '{}'", tag);
431 composedTrack.addTag(tag);
432 }
433 }
434
435 if (entry.getValue().getTagWithProfile()) {
436 tagByProfile(composedTrack, eplist);
437 }
438 String fileName;
439 if (!isHLS || composedTrack.isMaster()) {
440
441 fileName = getFileNameFromElements(sourceTrack, composedTrack);
442 } else {
443
444
445 fileName = FilenameUtils.getName(composedTrack.getURI().getPath());
446 }
447
448 composedTrack.setURI(workspace.moveTo(composedTrack.getURI(), mediaPackage.getIdentifier().toString(),
449 composedTrack.getIdentifier(), fileName));
450 mediaPackage.addDerived(composedTrack, sourceTrack);
451 }
452 } else {
453 logger.warn("No output from MultiEncode operation");
454 }
455 }
456 WorkflowOperationResult result = createResult(mediaPackage, Action.CONTINUE, totalTimeInQueue);
457 logger.debug("MultiEncode operation completed");
458 return result;
459 }
460
461
462
463
464
465
466
467
468
469 private void tagByProfile(Track track, List<EncodingProfile> profiles) {
470 String rawfileName = track.getURI().getRawPath();
471 for (EncodingProfile ep : profiles) {
472
473
474
475
476 String suffixToSanitize = "X" + ep.getSuffix();
477
478 String suffix = workspace.toSafeName(suffixToSanitize).substring(1);
479 if (suffix.length() > 0 && rawfileName.endsWith(suffix)) {
480 track.addTag(ep.getIdentifier());
481 return;
482 }
483 }
484 }
485
486 private void decipherHLSPlaylistResults(Track track, JobInformation jobInfo, MediaPackage mediaPackage,
487 List<Track> composedTracks)
488 throws WorkflowOperationException, IllegalArgumentException, NotFoundException, IOException {
489 int nprofiles = jobInfo.getInfo().getProfiles().size();
490 List<Track> manifests = getManifest(composedTracks);
491
492 if (manifests.size() != nprofiles) {
493 throw new WorkflowOperationException("Number of output playlists does not match number of encoding profiles");
494 }
495 if (composedTracks.size() != manifests.size() * 2 - 1) {
496 throw new WorkflowOperationException("Number of output media does not match number of encoding profiles");
497 }
498 }
499
500 private MediaPackageElementFlavor newFlavor(Track track, String flavor) throws WorkflowOperationException {
501 if (StringUtils.isNotBlank(flavor)) {
502 try {
503 MediaPackageElementFlavor targetFlavor = MediaPackageElementFlavor.parseFlavor(flavor);
504 String flavorType = targetFlavor.getType();
505 String flavorSubtype = targetFlavor.getSubtype();
506
507 if ("*".equals(flavorType))
508 flavorType = track.getFlavor().getType();
509 if ("*".equals(flavorSubtype))
510 flavorSubtype = track.getFlavor().getSubtype();
511 return (new MediaPackageElementFlavor(flavorType, flavorSubtype));
512 } catch (IllegalArgumentException e) {
513 throw new WorkflowOperationException("Target flavor '" + flavor + "' is malformed");
514 }
515 }
516 return null;
517 }
518
519
520
521
522 private static final class JobInformation {
523
524 private Track track = null;
525 private ElementProfileTagFlavor info = null;
526 private boolean tagWithProfile;
527
528 JobInformation(Track track, ElementProfileTagFlavor info, boolean tagWithProfile) {
529 this.track = track;
530 this.info = info;
531 this.tagWithProfile = tagWithProfile;
532 }
533
534 public List<EncodingProfile> getProfileList() {
535 return info.encodingProfileList;
536 }
537
538
539
540
541
542
543 public Track getTrack() {
544 return track;
545 }
546
547 public boolean getTagWithProfile() {
548 return this.tagWithProfile;
549 }
550
551 public ElementProfileTagFlavor getInfo() {
552 return info;
553 }
554 }
555
556 @Reference
557 @Override
558 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
559 super.setServiceRegistry(serviceRegistry);
560 }
561
562 }