1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.handler.textanalyzer;
23
24 import org.opencastproject.composer.api.ComposerService;
25 import org.opencastproject.composer.api.EncoderException;
26 import org.opencastproject.job.api.Job;
27 import org.opencastproject.job.api.JobContext;
28 import org.opencastproject.mediapackage.Attachment;
29 import org.opencastproject.mediapackage.Catalog;
30 import org.opencastproject.mediapackage.MediaPackage;
31 import org.opencastproject.mediapackage.MediaPackageElement;
32 import org.opencastproject.mediapackage.MediaPackageElementBuilder;
33 import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
34 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
35 import org.opencastproject.mediapackage.MediaPackageElementParser;
36 import org.opencastproject.mediapackage.MediaPackageElements;
37 import org.opencastproject.mediapackage.MediaPackageException;
38 import org.opencastproject.mediapackage.MediaPackageReference;
39 import org.opencastproject.mediapackage.MediaPackageReferenceImpl;
40 import org.opencastproject.mediapackage.Track;
41 import org.opencastproject.metadata.mpeg7.MediaDuration;
42 import org.opencastproject.metadata.mpeg7.MediaRelTimePointImpl;
43 import org.opencastproject.metadata.mpeg7.MediaTime;
44 import org.opencastproject.metadata.mpeg7.MediaTimeImpl;
45 import org.opencastproject.metadata.mpeg7.MediaTimePoint;
46 import org.opencastproject.metadata.mpeg7.Mpeg7Catalog;
47 import org.opencastproject.metadata.mpeg7.Mpeg7CatalogService;
48 import org.opencastproject.metadata.mpeg7.Segment;
49 import org.opencastproject.metadata.mpeg7.SpatioTemporalDecomposition;
50 import org.opencastproject.metadata.mpeg7.SpatioTemporalLocator;
51 import org.opencastproject.metadata.mpeg7.SpatioTemporalLocatorImpl;
52 import org.opencastproject.metadata.mpeg7.TemporalDecomposition;
53 import org.opencastproject.metadata.mpeg7.Video;
54 import org.opencastproject.metadata.mpeg7.VideoSegment;
55 import org.opencastproject.metadata.mpeg7.VideoText;
56 import org.opencastproject.serviceregistry.api.ServiceRegistry;
57 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
58 import org.opencastproject.textanalyzer.api.TextAnalyzerException;
59 import org.opencastproject.textanalyzer.api.TextAnalyzerService;
60 import org.opencastproject.util.NotFoundException;
61 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
62 import org.opencastproject.workflow.api.ConfiguredTagsAndFlavors;
63 import org.opencastproject.workflow.api.WorkflowInstance;
64 import org.opencastproject.workflow.api.WorkflowOperationException;
65 import org.opencastproject.workflow.api.WorkflowOperationHandler;
66 import org.opencastproject.workflow.api.WorkflowOperationInstance;
67 import org.opencastproject.workflow.api.WorkflowOperationResult;
68 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
69 import org.opencastproject.workspace.api.Workspace;
70
71 import org.apache.commons.io.IOUtils;
72 import org.apache.commons.lang3.StringUtils;
73 import org.osgi.service.cm.ConfigurationException;
74 import org.osgi.service.cm.ManagedService;
75 import org.osgi.service.component.annotations.Component;
76 import org.osgi.service.component.annotations.Reference;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79
80 import java.io.File;
81 import java.io.FileInputStream;
82 import java.io.IOException;
83 import java.io.InputStream;
84 import java.net.URI;
85 import java.util.Dictionary;
86 import java.util.HashMap;
87 import java.util.Iterator;
88 import java.util.LinkedList;
89 import java.util.List;
90 import java.util.Map;
91 import java.util.Map.Entry;
92 import java.util.concurrent.ExecutionException;
93
94
95
96
97
98
99 @Component(
100 immediate = true,
101 service = { WorkflowOperationHandler.class, ManagedService.class },
102 property = {
103 "service.description=Text Analysis Workflow Operation Handler",
104 "workflow.operation=extract-text"
105 }
106 )
107 public class TextAnalysisWorkflowOperationHandler extends AbstractWorkflowOperationHandler implements ManagedService {
108
109
110 private static final Logger logger = LoggerFactory.getLogger(TextAnalysisWorkflowOperationHandler.class);
111
112
113 public static final String IMAGE_EXTRACTION_PROFILE = "text-analysis.http";
114
115
116 private static final int DEFAULT_STABILITY_THRESHOLD = 5;
117
118
119 public static final String OPT_STABILITY_THRESHOLD = "stabilitythreshold";
120
121
122 private int stabilityThreshold = DEFAULT_STABILITY_THRESHOLD;
123
124
125 private Workspace workspace = null;
126
127
128 private Mpeg7CatalogService mpeg7CatalogService = null;
129
130
131 private TextAnalyzerService analysisService = null;
132
133
134 protected ComposerService composer = null;
135
136
137
138
139
140
141
142 @Reference
143 protected void setTextAnalyzer(TextAnalyzerService analysisService) {
144 this.analysisService = analysisService;
145 }
146
147
148
149
150
151
152
153
154 @Reference
155 public void setWorkspace(Workspace workspace) {
156 this.workspace = workspace;
157 }
158
159
160
161
162
163
164
165 @Reference(name = "Mpeg7Service")
166 protected void setMpeg7CatalogService(Mpeg7CatalogService catalogService) {
167 this.mpeg7CatalogService = catalogService;
168 }
169
170 @Override
171 public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
172 throws WorkflowOperationException {
173 logger.debug("Running segments preview workflow operation on {}", workflowInstance);
174
175 ConfiguredTagsAndFlavors tagsAndFlavors = getTagsAndFlavors(
176 workflowInstance, Configuration.many, Configuration.many, Configuration.many, Configuration.none);
177
178
179 MediaPackage src = (MediaPackage) workflowInstance.getMediaPackage().clone();
180 Catalog[] segmentCatalogs = src.getCatalogs(MediaPackageElements.SEGMENTS);
181 if (segmentCatalogs.length == 0) {
182 logger.info("Media package {} does not contain segment information", src);
183 return createResult(Action.CONTINUE);
184 }
185
186 try {
187 return extractVideoText(src, workflowInstance.getCurrentOperation(), tagsAndFlavors);
188 } catch (Exception e) {
189 throw new WorkflowOperationException(e);
190 }
191 }
192
193
194
195
196
197
198
199
200
201
202
203
204
205 protected WorkflowOperationResult extractVideoText(final MediaPackage mediaPackage,
206 WorkflowOperationInstance operation, ConfiguredTagsAndFlavors tagsAndFlavors) throws EncoderException,
207 InterruptedException, ExecutionException, IOException, NotFoundException, MediaPackageException,
208 TextAnalyzerException, WorkflowOperationException, ServiceRegistryException {
209 long totalTimeInQueue = 0;
210
211 List<String> sourceTagSet = tagsAndFlavors.getSrcTags();
212 ConfiguredTagsAndFlavors.TargetTags targetTagSet = tagsAndFlavors.getTargetTags();
213
214
215 Map<Catalog, Mpeg7Catalog> catalogs = loadSegmentCatalogs(mediaPackage, operation, tagsAndFlavors);
216
217
218 if (catalogs.size() == 0) {
219 logger.debug("Mediapackage {} has no suitable mpeg-7 catalogs based on tags {} to to run text analysis",
220 mediaPackage, sourceTagSet);
221 return createResult(mediaPackage, Action.CONTINUE);
222 }
223
224
225 for (Entry<Catalog, Mpeg7Catalog> mapEntry : catalogs.entrySet()) {
226 Map<VideoSegment, Job> jobs = new HashMap<VideoSegment, Job>();
227 List<Attachment> images = new LinkedList<Attachment>();
228 Catalog segmentCatalog = mapEntry.getKey();
229 try {
230 MediaPackageReference catalogRef = segmentCatalog.getReference();
231
232
233 if (catalogRef == null) {
234 logger.info("Skipping catalog {} since we can't determine the source track", segmentCatalog);
235 } else if (mediaPackage.getElementByReference(catalogRef) == null) {
236 logger.info("Skipping catalog {} since we can't determine the source track", segmentCatalog);
237 } else if (!(mediaPackage.getElementByReference(catalogRef) instanceof Track)) {
238 logger.info("Skipping catalog {} since it's source was not a track", segmentCatalog);
239 }
240
241 logger.info("Analyzing mpeg-7 segments catalog {} for text", segmentCatalog);
242
243
244 Mpeg7Catalog textCatalog = mapEntry.getValue().clone();
245 Track sourceTrack = mediaPackage.getTrack(catalogRef.getIdentifier());
246
247
248 Video videoContent = textCatalog.videoContent().next();
249 TemporalDecomposition<? extends Segment> decomposition = videoContent.getTemporalDecomposition();
250 Iterator<? extends Segment> segmentIterator = decomposition.segments();
251
252
253 List<VideoSegment> videoSegments = new LinkedList<VideoSegment>();
254 while (segmentIterator.hasNext()) {
255 Segment segment = segmentIterator.next();
256 if ((segment instanceof VideoSegment)) {
257 videoSegments.add((VideoSegment) segment);
258 }
259 }
260
261
262 double[] times = new double[videoSegments.size()];
263
264 for (int i = 0; i < videoSegments.size(); i++) {
265 VideoSegment videoSegment = videoSegments.get(i);
266 MediaTimePoint segmentTimePoint = videoSegment.getMediaTime().getMediaTimePoint();
267 MediaDuration segmentDuration = videoSegment.getMediaTime().getMediaDuration();
268
269
270 MediaPackageReference reference = null;
271 if (catalogRef == null) {
272 reference = new MediaPackageReferenceImpl();
273 } else {
274 reference = new MediaPackageReferenceImpl(catalogRef.getType(), catalogRef.getIdentifier());
275 }
276 reference.setProperty("time", segmentTimePoint.toString());
277
278
279
280
281 long startTimeSeconds = segmentTimePoint.getTimeInMilliseconds() / 1000;
282 long durationSeconds = segmentDuration.getDurationInMilliseconds() / 1000;
283 times[i] = Math.max(startTimeSeconds + durationSeconds - stabilityThreshold + 1, 0);
284 }
285
286
287
288 Job imageJob = composer.image(sourceTrack, IMAGE_EXTRACTION_PROFILE, times);
289 if (!waitForStatus(imageJob).isSuccess()) {
290 throw new WorkflowOperationException("Extracting scene images from " + sourceTrack + " failed");
291 }
292 if (imageJob.getPayload() == null) {
293 throw new WorkflowOperationException(
294 "The payload of extracting images job from " + sourceTrack + " was null");
295 }
296
297 totalTimeInQueue += imageJob.getQueueTime();
298 for (MediaPackageElement imageMpe : MediaPackageElementParser.getArrayFromXml(imageJob.getPayload())) {
299 Attachment image = (Attachment) imageMpe;
300 images.add(image);
301 }
302 if (images.isEmpty() || images.size() != times.length) {
303 throw new WorkflowOperationException(
304 "There are no images produced for " + sourceTrack
305 + " or the images count isn't equal the count of the video segments.");
306 }
307
308
309 Iterator<VideoSegment> it = videoSegments.iterator();
310 for (MediaPackageElement element : images) {
311 Attachment image = (Attachment) element;
312 VideoSegment videoSegment = it.next();
313 jobs.put(videoSegment, analysisService.extract(image));
314 }
315
316
317 if (!waitForStatus(jobs.values().toArray(new Job[jobs.size()])).isSuccess()) {
318 throw new WorkflowOperationException("Text extraction failed on images from " + sourceTrack);
319 }
320
321
322 for (Map.Entry<VideoSegment, Job> entry : jobs.entrySet()) {
323 Job job = serviceRegistry.getJob(entry.getValue().getId());
324 totalTimeInQueue += job.getQueueTime();
325
326 VideoSegment videoSegment = entry.getKey();
327 MediaDuration segmentDuration = videoSegment.getMediaTime().getMediaDuration();
328 Catalog catalog = (Catalog) MediaPackageElementParser.getFromXml(job.getPayload());
329 if (catalog == null) {
330 logger.warn("Text analysis did not return a valid mpeg7 for segment {}", videoSegment);
331 continue;
332 }
333 Mpeg7Catalog videoTextCatalog = loadMpeg7Catalog(catalog);
334 if (videoTextCatalog == null) {
335 throw new IllegalStateException("Text analysis service did not return a valid mpeg7");
336 }
337
338
339 Iterator<Video> videoTextContents = videoTextCatalog.videoContent();
340 if (videoTextContents == null || !videoTextContents.hasNext()) {
341 logger.debug("Text analysis was not able to extract any text from {}", job.getArguments().get(0));
342 break;
343 }
344
345 try {
346 Video textVideoContent = videoTextContents.next();
347 VideoSegment textVideoSegment = (VideoSegment) textVideoContent.getTemporalDecomposition().segments()
348 .next();
349 VideoText[] videoTexts = textVideoSegment.getSpatioTemporalDecomposition().getVideoText();
350 SpatioTemporalDecomposition std = videoSegment.createSpatioTemporalDecomposition(true, false);
351 for (VideoText videoText : videoTexts) {
352 MediaTime mediaTime = new MediaTimeImpl(new MediaRelTimePointImpl(0), segmentDuration);
353 SpatioTemporalLocator locator = new SpatioTemporalLocatorImpl(mediaTime);
354 videoText.setSpatioTemporalLocator(locator);
355 std.addVideoText(videoText);
356 }
357 } catch (Exception e) {
358 logger.warn("The mpeg-7 structure returned by the text analyzer is not what is expected", e);
359 continue;
360 }
361 }
362
363
364 MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
365 Catalog catalog = (Catalog) builder.newElement(MediaPackageElement.Type.Catalog, MediaPackageElements.TEXTS);
366 catalog.setIdentifier(null);
367 catalog.setReference(segmentCatalog.getReference());
368 mediaPackage.add(catalog);
369 InputStream in = mpeg7CatalogService.serialize(textCatalog);
370 String filename = "slidetext.xml";
371 URI workspaceURI = workspace
372 .put(mediaPackage.getIdentifier().toString(), catalog.getIdentifier(), filename, in);
373 catalog.setURI(workspaceURI);
374
375
376 try {
377 mediaPackage.remove(segmentCatalog);
378 workspace.delete(segmentCatalog.getURI());
379 } catch (Exception e) {
380 logger.warn("Unable to delete segment catalog {}", segmentCatalog.getURI(), e);
381 }
382
383
384 catalog.setFlavor(MediaPackageElements.TEXTS);
385 applyTargetTagsToElement(targetTagSet, catalog);
386 } finally {
387
388 logger.debug("Removing temporary images");
389 for (Attachment image : images) {
390 try {
391 workspace.delete(image.getURI());
392 } catch (Exception e) {
393 logger.warn("Unable to delete temporary image {}", image.getURI(), e);
394 }
395 }
396
397 for (Job j : jobs.values()) {
398 Catalog catalog = null;
399 try {
400 Job job = serviceRegistry.getJob(j.getId());
401 if (!Job.Status.FINISHED.equals(job.getStatus())) {
402 continue;
403 }
404 catalog = (Catalog) MediaPackageElementParser.getFromXml(job.getPayload());
405 if (catalog != null) {
406 workspace.delete(catalog.getURI());
407 }
408 } catch (Exception e) {
409 if (catalog != null) {
410 logger.warn("Unable to delete temporary text file {}", catalog.getURI(), e);
411 } else {
412 logger.warn("Unable to parse textextraction payload of job {}", j.getId());
413 }
414 }
415 }
416 }
417 }
418
419 logger.debug("Text analysis completed");
420 return createResult(mediaPackage, Action.CONTINUE, totalTimeInQueue);
421 }
422
423
424
425
426
427
428
429
430
431
432 protected Mpeg7Catalog loadMpeg7Catalog(Catalog catalog) throws IOException {
433 InputStream in = null;
434 try {
435 File f = workspace.get(catalog.getURI());
436 in = new FileInputStream(f);
437 return mpeg7CatalogService.load(in);
438 } catch (NotFoundException e) {
439 throw new IOException("Unable to open catalog " + catalog + ": " + e.getMessage());
440 } finally {
441 IOUtils.closeQuietly(in);
442 }
443 }
444
445
446
447
448
449
450
451
452
453
454
455
456
457 protected Map<Catalog, Mpeg7Catalog> loadSegmentCatalogs(MediaPackage mediaPackage,
458 WorkflowOperationInstance operation, ConfiguredTagsAndFlavors tagsAndFlavors) throws IOException {
459 HashMap<Catalog, Mpeg7Catalog> catalogs = new HashMap<Catalog, Mpeg7Catalog>();
460
461 List<MediaPackageElementFlavor> sourceFlavors = tagsAndFlavors.getSrcFlavors();
462 List<String> sourceTagSet = tagsAndFlavors.getSrcTags();
463
464 Catalog[] catalogsWithTags = mediaPackage.getCatalogsByTags(sourceTagSet);
465
466 for (Catalog mediaPackageCatalog : catalogsWithTags) {
467 if (!MediaPackageElements.SEGMENTS.equals(mediaPackageCatalog.getFlavor())) {
468 continue;
469 }
470 if (sourceFlavors != null) {
471 if (mediaPackageCatalog.getReference() == null) {
472 continue;
473 }
474 Track t = mediaPackage.getTrack(mediaPackageCatalog.getReference().getIdentifier());
475 if (t == null || sourceFlavors.stream().noneMatch(flavor -> t.getFlavor().matches(flavor))) {
476 continue;
477 }
478 }
479
480
481 if (!mediaPackageCatalog.containsTag(sourceTagSet)) {
482 continue;
483 }
484
485 Mpeg7Catalog mpeg7 = loadMpeg7Catalog(mediaPackageCatalog);
486
487
488 if (mpeg7.videoContent() == null || !mpeg7.videoContent().hasNext()) {
489 logger.debug("Mpeg-7 segments catalog {} does not contain any video content", mpeg7);
490 continue;
491 }
492
493
494 Video videoContent = mpeg7.videoContent().next();
495 TemporalDecomposition<? extends Segment> decomposition = videoContent.getTemporalDecomposition();
496 if (decomposition == null || !decomposition.hasSegments()) {
497 logger.debug("Mpeg-7 catalog {} does not contain a temporal decomposition", mpeg7);
498 continue;
499 }
500 catalogs.put(mediaPackageCatalog, mpeg7);
501 }
502
503 return catalogs;
504 }
505
506
507
508
509 @SuppressWarnings("rawtypes")
510 @Override
511 public void updated(Dictionary properties) throws ConfigurationException {
512 if (properties != null && properties.get(OPT_STABILITY_THRESHOLD) != null) {
513 String threshold = StringUtils.trimToNull((String)properties.get(OPT_STABILITY_THRESHOLD));
514 try {
515 stabilityThreshold = Integer.parseInt(threshold);
516 logger.info("The videosegmenter's stability threshold has been set to {} frames", stabilityThreshold);
517 } catch (Exception e) {
518 stabilityThreshold = DEFAULT_STABILITY_THRESHOLD;
519 logger.warn("Found illegal value '{}' for the videosegmenter stability threshold. "
520 + "Falling back to default value of {} frames", threshold, DEFAULT_STABILITY_THRESHOLD);
521 }
522 } else {
523 stabilityThreshold = DEFAULT_STABILITY_THRESHOLD;
524 logger.info("Using the default value of {} frames for the videosegmenter stability threshold",
525 DEFAULT_STABILITY_THRESHOLD);
526 }
527 }
528
529
530
531
532
533
534 @Reference
535 void setComposerService(ComposerService composerService) {
536 this.composer = composerService;
537 }
538
539 @Reference
540 @Override
541 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
542 super.setServiceRegistry(serviceRegistry);
543 }
544
545 }