1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.handler.composer;
23
24 import static com.entwinemedia.fn.Equality.hash;
25 import static com.entwinemedia.fn.Prelude.chuck;
26 import static com.entwinemedia.fn.Prelude.unexhaustiveMatchError;
27 import static com.entwinemedia.fn.Stream.$;
28 import static com.entwinemedia.fn.parser.Parsers.character;
29 import static com.entwinemedia.fn.parser.Parsers.many;
30 import static com.entwinemedia.fn.parser.Parsers.opt;
31 import static com.entwinemedia.fn.parser.Parsers.space;
32 import static com.entwinemedia.fn.parser.Parsers.symbol;
33 import static com.entwinemedia.fn.parser.Parsers.token;
34 import static java.lang.String.format;
35 import static org.opencastproject.util.EqualsUtil.eq;
36
37 import org.opencastproject.composer.api.ComposerService;
38 import org.opencastproject.composer.api.EncodingProfile;
39 import org.opencastproject.job.api.Job;
40 import org.opencastproject.job.api.JobBarrier;
41 import org.opencastproject.job.api.JobContext;
42 import org.opencastproject.mediapackage.Attachment;
43 import org.opencastproject.mediapackage.MediaPackage;
44 import org.opencastproject.mediapackage.MediaPackageElement;
45 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
46 import org.opencastproject.mediapackage.MediaPackageElementParser;
47 import org.opencastproject.mediapackage.MediaPackageException;
48 import org.opencastproject.mediapackage.MediaPackageSupport;
49 import org.opencastproject.mediapackage.Track;
50 import org.opencastproject.mediapackage.VideoStream;
51 import org.opencastproject.mediapackage.selector.AbstractMediaPackageElementSelector;
52 import org.opencastproject.mediapackage.selector.TrackSelector;
53 import org.opencastproject.serviceregistry.api.ServiceRegistry;
54 import org.opencastproject.util.JobUtil;
55 import org.opencastproject.util.MimeTypes;
56 import org.opencastproject.util.UnknownFileTypeException;
57 import org.opencastproject.util.data.Collections;
58 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
59 import org.opencastproject.workflow.api.ConfiguredTagsAndFlavors;
60 import org.opencastproject.workflow.api.WorkflowInstance;
61 import org.opencastproject.workflow.api.WorkflowOperationException;
62 import org.opencastproject.workflow.api.WorkflowOperationHandler;
63 import org.opencastproject.workflow.api.WorkflowOperationInstance;
64 import org.opencastproject.workflow.api.WorkflowOperationResult;
65 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
66 import org.opencastproject.workspace.api.Workspace;
67
68 import com.entwinemedia.fn.Fn;
69 import com.entwinemedia.fn.Fn2;
70 import com.entwinemedia.fn.Fx;
71 import com.entwinemedia.fn.P2;
72 import com.entwinemedia.fn.Prelude;
73 import com.entwinemedia.fn.StreamFold;
74 import com.entwinemedia.fn.data.Opt;
75 import com.entwinemedia.fn.fns.Strings;
76 import com.entwinemedia.fn.parser.Parser;
77 import com.entwinemedia.fn.parser.Parsers;
78 import com.entwinemedia.fn.parser.Result;
79
80 import org.apache.commons.io.FilenameUtils;
81 import org.osgi.service.component.annotations.Component;
82 import org.osgi.service.component.annotations.Reference;
83 import org.slf4j.Logger;
84 import org.slf4j.LoggerFactory;
85
86 import java.net.URI;
87 import java.util.Arrays;
88 import java.util.IllegalFormatException;
89 import java.util.List;
90 import java.util.Locale;
91 import java.util.UUID;
92 import java.util.stream.Collectors;
93
94
95
96
97 @Component(
98 immediate = true,
99 service = WorkflowOperationHandler.class,
100 property = {
101 "service.description=Image Workflow Operation Handler",
102 "workflow.operation=image"
103 }
104 )
105 public class ImageWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
106
107 private static final Logger logger = LoggerFactory.getLogger(ImageWorkflowOperationHandler.class);
108
109
110 public static final String OPT_PROFILES = "encoding-profile";
111 public static final String OPT_POSITIONS = "time";
112 public static final String OPT_TARGET_BASE_NAME_FORMAT_SECOND = "target-base-name-format-second";
113 public static final String OPT_TARGET_BASE_NAME_FORMAT_PERCENT = "target-base-name-format-percent";
114 public static final String OPT_END_MARGIN = "end-margin";
115
116 private static final long END_MARGIN_DEFAULT = 100;
117 public static final double SINGLE_FRAME_POS = 0.0;
118
119
120 private ComposerService composerService = null;
121
122
123 private Workspace workspace = null;
124
125
126
127
128
129
130
131 @Reference
132 protected void setComposerService(ComposerService composerService) {
133 this.composerService = composerService;
134 }
135
136
137
138
139
140
141
142
143 @Reference
144 public void setWorkspace(Workspace workspace) {
145 this.workspace = workspace;
146 }
147
148 @Override
149 public WorkflowOperationResult start(final WorkflowInstance wi, JobContext ctx)
150 throws WorkflowOperationException {
151 logger.debug("Running image workflow operation on {}", wi);
152 try {
153 MediaPackage mp = wi.getMediaPackage();
154 final Extractor e = new Extractor(this, configure(mp, wi));
155 return e.main(MediaPackageSupport.copy(mp));
156 } catch (Exception e) {
157 throw new WorkflowOperationException(e);
158 }
159 }
160
161
162
163
164 static final class Extractor {
165 private final ImageWorkflowOperationHandler handler;
166 private final Cfg cfg;
167
168 Extractor(ImageWorkflowOperationHandler handler, Cfg cfg) {
169 this.handler = handler;
170 this.cfg = cfg;
171 }
172
173
174 WorkflowOperationResult main(final MediaPackage mp) throws WorkflowOperationException {
175 if (cfg.sourceTracks.size() == 0) {
176 logger.info("No source tracks found in media package {}, skipping operation", mp.getIdentifier());
177 return handler.createResult(mp, Action.SKIP);
178 }
179
180 final List<Extraction> extractions = cfg.sourceTracks.stream().flatMap(track -> {
181 final List<MediaPosition> positions = limit(track, cfg.positions);
182 if (positions.size() != cfg.positions.size()) {
183 logger.warn("Could not apply all configured positions to track {}", track);
184 }
185 logger.info("Extracting images from {} at position {}", track, positions);
186
187 return cfg.profiles.stream()
188 .map(profile -> new Extraction(extractImages(track, profile, positions), track, profile, positions));
189 }).collect(Collectors.toList());
190 final List<Job> extractionJobs = concatJobs(extractions);
191 final JobBarrier.Result extractionResult = JobUtil.waitForJobs(handler.serviceRegistry, extractionJobs);
192 if (extractionResult.isSuccess()) {
193
194 for (final Extraction extraction : extractions) {
195 final List<Attachment> images = getImages(extraction.job);
196 final int expectedNrOfImages = extraction.positions.size();
197 if (images.size() == expectedNrOfImages) {
198
199 for (final P2<Attachment, MediaPosition> image : $(images).zip(extraction.positions)) {
200 adjustMetadata(extraction, image.get1());
201 if (image.get1().getIdentifier() == null) image.get1().setIdentifier(UUID.randomUUID().toString());
202 mp.addDerived(image.get1(), extraction.track);
203 final String fileName = createFileName(
204 extraction.profile.getSuffix(), extraction.track.getURI(), image.get2());
205 moveToWorkspace(mp, image.get1(), fileName);
206 }
207 } else {
208
209 throw new WorkflowOperationException(
210 format("Only %s of %s images have been extracted from track %s",
211 images.size(), expectedNrOfImages, extraction.track));
212 }
213 }
214 return handler.createResult(mp, Action.CONTINUE, JobUtil.sumQueueTime(extractionJobs));
215 } else {
216 throw new WorkflowOperationException("Image extraction failed");
217 }
218 }
219
220
221
222
223
224 void adjustMetadata(Extraction extraction, Attachment image) {
225
226 for (final MediaPackageElementFlavor flavor : cfg.targetImageFlavor) {
227 final String flavorType = eq("*", flavor.getType())
228 ? extraction.track.getFlavor().getType()
229 : flavor.getType();
230 final String flavorSubtype = eq("*", flavor.getSubtype())
231 ? extraction.track.getFlavor().getSubtype()
232 : flavor.getSubtype();
233 image.setFlavor(new MediaPackageElementFlavor(flavorType, flavorSubtype));
234 logger.debug("Resulting image has flavor '{}'", image.getFlavor());
235 }
236
237 try {
238 image.setMimeType(MimeTypes.fromURI(image.getURI()));
239 } catch (UnknownFileTypeException e) {
240 logger.warn("Mime type unknown for file {}. Setting none.", image.getURI(), e);
241 }
242
243 for (final String tag : cfg.targetImageTags) {
244 logger.trace("Tagging image with '{}'", tag);
245 image.addTag(tag);
246 }
247 }
248
249
250 String createFileName(final String suffix, final URI trackUri, final MediaPosition pos) {
251 final String trackBaseName = FilenameUtils.getBaseName(trackUri.getPath());
252 final String format;
253 switch (pos.type) {
254 case Seconds:
255 format = cfg.targetBaseNameFormatSecond.getOr(trackBaseName + "_%.3fs%s");
256 break;
257 case Percentage:
258 format = cfg.targetBaseNameFormatPercent.getOr(trackBaseName + "_%.1fp%s");
259 break;
260 default:
261 throw unexhaustiveMatchError();
262 }
263 return formatFileName(format, pos.position, suffix);
264 }
265
266
267 private void moveToWorkspace(final MediaPackage mp, final Attachment image, final String fileName) {
268 try {
269 image.setURI(handler.workspace.moveTo(
270 image.getURI(),
271 mp.getIdentifier().toString(),
272 image.getIdentifier(),
273 fileName));
274 } catch (Exception e) {
275 chuck(new WorkflowOperationException(e));
276 }
277 }
278
279
280 private Job extractImages(final Track track, final EncodingProfile profile, final List<MediaPosition> positions) {
281 final List<Double> p = $(positions).map(new Fn<MediaPosition, Double>() {
282 @Override public Double apply(MediaPosition mediaPosition) {
283 return toSeconds(track, mediaPosition, cfg.endMargin);
284 }
285 }).toList();
286 try {
287 return handler.composerService.image(track, profile.getIdentifier(), Collections.toDoubleArray(p));
288 } catch (Exception e) {
289 return chuck(new WorkflowOperationException("Error starting image extraction job", e));
290 }
291 }
292 }
293
294
295
296
297
298
299 static String formatFileName(String format, double position, String suffix) {
300 return format(Locale.ROOT, format, position, suffix);
301 }
302
303
304
305 private static List<Job> concatJobs(List<Extraction> extractions) {
306 return $(extractions).map(new Fn<Extraction, Job>() {
307 @Override public Job apply(Extraction extraction) {
308 return extraction.job;
309 }
310 }).toList();
311 }
312
313
314 @SuppressWarnings("unchecked")
315 private static List<Attachment> getImages(Job job) {
316 final List<Attachment> images;
317 try {
318 images = (List<Attachment>) MediaPackageElementParser.getArrayFromXml(job.getPayload());
319 } catch (MediaPackageException e) {
320 return chuck(e);
321 }
322 if (!images.isEmpty()) {
323 return images;
324 } else {
325 return chuck(new WorkflowOperationException("Job did not extract any images"));
326 }
327 }
328
329
330 static List<MediaPosition> limit(Track track, List<MediaPosition> positions) {
331 final Long duration = track.getDuration();
332
333
334 if (duration == null || (track.getStreams() != null && Arrays.stream(track.getStreams())
335 .filter(stream -> stream instanceof VideoStream)
336 .map(org.opencastproject.mediapackage.Stream::getFrameCount)
337 .allMatch(frameCount -> frameCount == null || frameCount == 1))) {
338 return java.util.Collections.singletonList(new MediaPosition(PositionType.Seconds, 0));
339 }
340
341 return positions.stream()
342 .filter(p -> (PositionType.Seconds.equals(p.type) && p.position >= 0 && p.position < duration)
343 || (PositionType.Percentage.equals(p.type) && p.position >= 0 && p.position <= 100))
344 .collect(Collectors.toList());
345 }
346
347
348
349
350
351
352 static double toSeconds(Track track, MediaPosition position, double endMarginMs) {
353 final long durationMs = track.getDuration() == null ? 0 : track.getDuration();
354 final double posMs;
355 switch (position.type) {
356 case Percentage:
357 posMs = durationMs * position.position / 100.0;
358 break;
359 case Seconds:
360 posMs = position.position * 1000.0;
361 break;
362 default:
363 throw unexhaustiveMatchError();
364 }
365
366 return Math.abs(durationMs - posMs) >= endMarginMs
367 ? posMs / 1000.0
368 : Math.max(0, ((double) durationMs - endMarginMs)) / 1000.0;
369 }
370
371
372
373
374 public static <E extends MediaPackageElement, S extends AbstractMediaPackageElementSelector<E>>
375 StreamFold<MediaPackageElementFlavor, S> flavorFold(S selector) {
376 return StreamFold.foldl(selector, new Fn2<S, MediaPackageElementFlavor, S>() {
377 @Override public S apply(S sum, MediaPackageElementFlavor flavor) {
378 sum.addFlavor(flavor);
379 return sum;
380 }
381 });
382 }
383
384
385 public static <E extends MediaPackageElement, S extends AbstractMediaPackageElementSelector<E>>
386 StreamFold<String, S> tagFold(S selector) {
387 return StreamFold.foldl(selector, new Fn2<S, String, S>() {
388 @Override public S apply(S sum, String tag) {
389 sum.addTag(tag);
390 return sum;
391 }
392 });
393 }
394
395
396
397
398
399 public static Fn<String, EncodingProfile> fetchProfile(final ComposerService composerService) {
400 return new Fn<String, EncodingProfile>() {
401 @Override public EncodingProfile apply(String profileName) {
402 final EncodingProfile profile = composerService.getProfile(profileName);
403 return profile != null
404 ? profile
405 : Prelude.<EncodingProfile>chuck(new WorkflowOperationException("Encoding profile '" + profileName + "' was not found"));
406 }
407 };
408 }
409
410
411
412
413
414 static final class Extraction {
415
416 private final Job job;
417
418 private final Track track;
419
420 private final EncodingProfile profile;
421
422 private final List<MediaPosition> positions;
423
424 private Extraction(Job job, Track track, EncodingProfile profile, List<MediaPosition> positions) {
425 this.job = job;
426 this.track = track;
427 this.profile = profile;
428 this.positions = positions;
429 }
430 }
431
432
433
434
435
436
437 static final class Cfg {
438
439 private final List<Track> sourceTracks;
440 private final List<MediaPosition> positions;
441 private final List<EncodingProfile> profiles;
442 private final List<MediaPackageElementFlavor> targetImageFlavor;
443 private final List<String> targetImageTags;
444 private final Opt<String> targetBaseNameFormatSecond;
445 private final Opt<String> targetBaseNameFormatPercent;
446 private final long endMargin;
447
448 Cfg(List<Track> sourceTracks,
449 List<MediaPosition> positions,
450 List<EncodingProfile> profiles,
451 List<MediaPackageElementFlavor> targetImageFlavor,
452 List<String> targetImageTags,
453 Opt<String> targetBaseNameFormatSecond,
454 Opt<String> targetBaseNameFormatPercent,
455 long endMargin) {
456 this.sourceTracks = sourceTracks;
457 this.positions = positions;
458 this.profiles = profiles;
459 this.targetImageFlavor = targetImageFlavor;
460 this.targetImageTags = targetImageTags;
461 this.endMargin = endMargin;
462 this.targetBaseNameFormatSecond = targetBaseNameFormatSecond;
463 this.targetBaseNameFormatPercent = targetBaseNameFormatPercent;
464 }
465 }
466
467
468 private Cfg configure(MediaPackage mp, WorkflowInstance wi) throws WorkflowOperationException {
469 WorkflowOperationInstance woi = wi.getCurrentOperation();
470 ConfiguredTagsAndFlavors tagsAndFlavors = getTagsAndFlavors(wi,
471 Configuration.many, Configuration.many, Configuration.many, Configuration.one);
472 final List<EncodingProfile> profiles = getOptConfig(woi, OPT_PROFILES).toStream().bind(asList.toFn())
473 .map(fetchProfile(composerService)).toList();
474 final List<String> targetImageTags = tagsAndFlavors.getTargetTags();
475 final List<MediaPackageElementFlavor> targetImageFlavor = tagsAndFlavors.getTargetFlavors();
476 final List<Track> sourceTracks;
477 {
478
479 final List<String> sourceTags = tagsAndFlavors.getSrcTags();
480 final List<MediaPackageElementFlavor> sourceFlavors = tagsAndFlavors.getSrcFlavors();
481 TrackSelector trackSelector = new TrackSelector();
482
483
484 for (String tag : sourceTags) {
485 trackSelector.addTag(tag);
486 }
487 for (MediaPackageElementFlavor flavor : sourceFlavors) {
488 trackSelector.addFlavor(flavor);
489 }
490
491
492 sourceTracks = trackSelector.select(mp, true).stream()
493 .filter(Track::hasVideo)
494 .collect(Collectors.toList());
495 }
496 final List<MediaPosition> positions = parsePositions(getConfig(woi, OPT_POSITIONS));
497 final long endMargin = getOptConfig(woi, OPT_END_MARGIN).bind(Strings.toLong).getOr(END_MARGIN_DEFAULT);
498
499 return new Cfg(sourceTracks,
500 positions,
501 profiles,
502 targetImageFlavor,
503 targetImageTags,
504 getTargetBaseNameFormat(woi, OPT_TARGET_BASE_NAME_FORMAT_SECOND),
505 getTargetBaseNameFormat(woi, OPT_TARGET_BASE_NAME_FORMAT_PERCENT),
506 endMargin);
507 }
508
509
510 private Opt<String> getTargetBaseNameFormat(WorkflowOperationInstance woi, final String formatName) {
511 return getOptConfig(woi, formatName).each(validateTargetBaseNameFormat(formatName));
512 }
513
514 static Fx<String> validateTargetBaseNameFormat(final String formatName) {
515 return new Fx<String>() {
516 @Override public void apply(String format) {
517 boolean valid;
518 try {
519 final String name = formatFileName(format, 15.11, ".png");
520 valid = name.contains(".") && name.contains(".png");
521 } catch (IllegalFormatException e) {
522 valid = false;
523 }
524 if (!valid) {
525 chuck(new WorkflowOperationException(format(
526 "%s is not a valid format string for config option %s",
527 format, formatName)));
528 }
529 }
530 };
531 }
532
533
534
535
536
537
538 static final class MediaPositionParser {
539 private MediaPositionParser() {
540 }
541
542 static final Parser<Double> number = token(Parsers.dbl);
543 static final Parser<MediaPosition> seconds = number.bind(new Fn<Double, Parser<MediaPosition>>() {
544 @Override public Parser<MediaPosition> apply(Double p) {
545 return Parsers.yield(new MediaPosition(PositionType.Seconds, p));
546 }
547 });
548 static final Parser<MediaPosition> percentage =
549 number.bind(Parsers.<Double, String>ignore(symbol("%"))).bind(new Fn<Double, Parser<MediaPosition>>() {
550 @Override public Parser<MediaPosition> apply(Double p) {
551 return Parsers.yield(new MediaPosition(PositionType.Percentage, p));
552 }
553 });
554 static final Parser<Character> comma = token(character(','));
555 static final Parser<Character> ws = token(space);
556 static final Parser<MediaPosition> position = percentage.or(seconds);
557
558
559 static final Parser<List<MediaPosition>> positions =
560 position.bind(new Fn<MediaPosition, Parser<List<MediaPosition>>>() {
561
562 @Override public Parser<List<MediaPosition>> apply(final MediaPosition first) {
563
564 return many(opt(comma).bind(Parsers.ignorePrevious(position)))
565 .bind(new Fn<List<MediaPosition>, Parser<List<MediaPosition>>>() {
566 @Override public Parser<List<MediaPosition>> apply(List<MediaPosition> rest) {
567 return Parsers.yield($(first).append(rest).toList());
568 }
569 });
570 }
571 });
572 }
573
574 private List<MediaPosition> parsePositions(String time) throws WorkflowOperationException {
575 final Result<List<MediaPosition>> r = MediaPositionParser.positions.parse(time);
576 if (r.isDefined() && r.getRest().isEmpty()) {
577 return r.getResult();
578 } else {
579 throw new WorkflowOperationException(format("Cannot parse time string %s.", time));
580 }
581 }
582
583 enum PositionType {
584 Percentage, Seconds
585 }
586
587
588
589
590 static final class MediaPosition {
591 private double position;
592 private final PositionType type;
593
594 MediaPosition(PositionType type, double position) {
595 this.position = position;
596 this.type = type;
597 }
598
599 public void setPosition(double position) {
600 this.position = position;
601 }
602
603 @Override public int hashCode() {
604 return hash(position, type);
605 }
606
607 @Override public boolean equals(Object that) {
608 return (this == that) || (that instanceof MediaPosition && eqFields((MediaPosition) that));
609 }
610
611 private boolean eqFields(MediaPosition that) {
612 return position == that.position && eq(type, that.type);
613 }
614
615 @Override public String toString() {
616 return format("MediaPosition(%s, %s)", type, position);
617 }
618 }
619
620 @Reference
621 @Override
622 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
623 super.setServiceRegistry(serviceRegistry);
624 }
625
626 }
627