1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.handler.textanalyzer;
23
24 import org.opencastproject.composer.api.ComposerService;
25 import org.opencastproject.composer.api.EncoderException;
26 import org.opencastproject.job.api.Job;
27 import org.opencastproject.job.api.JobContext;
28 import org.opencastproject.mediapackage.Attachment;
29 import org.opencastproject.mediapackage.Catalog;
30 import org.opencastproject.mediapackage.MediaPackage;
31 import org.opencastproject.mediapackage.MediaPackageElement;
32 import org.opencastproject.mediapackage.MediaPackageElementBuilder;
33 import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
34 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
35 import org.opencastproject.mediapackage.MediaPackageElementParser;
36 import org.opencastproject.mediapackage.MediaPackageElements;
37 import org.opencastproject.mediapackage.MediaPackageException;
38 import org.opencastproject.mediapackage.MediaPackageReference;
39 import org.opencastproject.mediapackage.MediaPackageReferenceImpl;
40 import org.opencastproject.mediapackage.Track;
41 import org.opencastproject.metadata.mpeg7.MediaDuration;
42 import org.opencastproject.metadata.mpeg7.MediaRelTimePointImpl;
43 import org.opencastproject.metadata.mpeg7.MediaTime;
44 import org.opencastproject.metadata.mpeg7.MediaTimeImpl;
45 import org.opencastproject.metadata.mpeg7.MediaTimePoint;
46 import org.opencastproject.metadata.mpeg7.Mpeg7Catalog;
47 import org.opencastproject.metadata.mpeg7.Mpeg7CatalogService;
48 import org.opencastproject.metadata.mpeg7.Segment;
49 import org.opencastproject.metadata.mpeg7.SpatioTemporalDecomposition;
50 import org.opencastproject.metadata.mpeg7.SpatioTemporalLocator;
51 import org.opencastproject.metadata.mpeg7.SpatioTemporalLocatorImpl;
52 import org.opencastproject.metadata.mpeg7.TemporalDecomposition;
53 import org.opencastproject.metadata.mpeg7.Video;
54 import org.opencastproject.metadata.mpeg7.VideoSegment;
55 import org.opencastproject.metadata.mpeg7.VideoText;
56 import org.opencastproject.serviceregistry.api.ServiceRegistry;
57 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
58 import org.opencastproject.textanalyzer.api.TextAnalyzerException;
59 import org.opencastproject.textanalyzer.api.TextAnalyzerService;
60 import org.opencastproject.util.NotFoundException;
61 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
62 import org.opencastproject.workflow.api.ConfiguredTagsAndFlavors;
63 import org.opencastproject.workflow.api.WorkflowInstance;
64 import org.opencastproject.workflow.api.WorkflowOperationException;
65 import org.opencastproject.workflow.api.WorkflowOperationHandler;
66 import org.opencastproject.workflow.api.WorkflowOperationInstance;
67 import org.opencastproject.workflow.api.WorkflowOperationResult;
68 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
69 import org.opencastproject.workspace.api.Workspace;
70
71 import org.apache.commons.io.IOUtils;
72 import org.apache.commons.lang3.StringUtils;
73 import org.osgi.service.cm.ConfigurationException;
74 import org.osgi.service.cm.ManagedService;
75 import org.osgi.service.component.annotations.Component;
76 import org.osgi.service.component.annotations.Reference;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79
80 import java.io.File;
81 import java.io.FileInputStream;
82 import java.io.IOException;
83 import java.io.InputStream;
84 import java.net.URI;
85 import java.util.Dictionary;
86 import java.util.HashMap;
87 import java.util.Iterator;
88 import java.util.LinkedList;
89 import java.util.List;
90 import java.util.Map;
91 import java.util.Map.Entry;
92 import java.util.concurrent.ExecutionException;
93
94
95
96
97
98
99 @Component(
100 immediate = true,
101 service = { WorkflowOperationHandler.class, ManagedService.class },
102 property = {
103 "service.description=Text Analysis Workflow Operation Handler",
104 "workflow.operation=extract-text"
105 }
106 )
107 public class TextAnalysisWorkflowOperationHandler extends AbstractWorkflowOperationHandler implements ManagedService {
108
109
110 private static final Logger logger = LoggerFactory.getLogger(TextAnalysisWorkflowOperationHandler.class);
111
112
113 public static final String IMAGE_EXTRACTION_PROFILE = "text-analysis.http";
114
115
116 private static final int DEFAULT_STABILITY_THRESHOLD = 5;
117
118
119 public static final String OPT_STABILITY_THRESHOLD = "stabilitythreshold";
120
121
122 private int stabilityThreshold = DEFAULT_STABILITY_THRESHOLD;
123
124
125 private Workspace workspace = null;
126
127
128 private Mpeg7CatalogService mpeg7CatalogService = null;
129
130
131 private TextAnalyzerService analysisService = null;
132
133
134 protected ComposerService composer = null;
135
136
137
138
139
140
141
142 @Reference
143 protected void setTextAnalyzer(TextAnalyzerService analysisService) {
144 this.analysisService = analysisService;
145 }
146
147
148
149
150
151
152
153
154 @Reference
155 public void setWorkspace(Workspace workspace) {
156 this.workspace = workspace;
157 }
158
159
160
161
162
163
164
165 @Reference(name = "Mpeg7Service")
166 protected void setMpeg7CatalogService(Mpeg7CatalogService catalogService) {
167 this.mpeg7CatalogService = catalogService;
168 }
169
170 @Override
171 public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
172 throws WorkflowOperationException {
173 logger.debug("Running segments preview workflow operation on {}", workflowInstance);
174
175 ConfiguredTagsAndFlavors tagsAndFlavors = getTagsAndFlavors(
176 workflowInstance, Configuration.many, Configuration.many, Configuration.many, Configuration.none);
177
178
179 MediaPackage src = (MediaPackage) workflowInstance.getMediaPackage().clone();
180 Catalog[] segmentCatalogs = src.getCatalogs(MediaPackageElements.SEGMENTS);
181 if (segmentCatalogs.length == 0) {
182 logger.info("Media package {} does not contain segment information", src);
183 return createResult(Action.CONTINUE);
184 }
185
186 try {
187 return extractVideoText(src, workflowInstance.getCurrentOperation(), tagsAndFlavors);
188 } catch (Exception e) {
189 throw new WorkflowOperationException(e);
190 }
191 }
192
193
194
195
196
197
198
199
200
201
202
203
204
205 protected WorkflowOperationResult extractVideoText(final MediaPackage mediaPackage,
206 WorkflowOperationInstance operation, ConfiguredTagsAndFlavors tagsAndFlavors) throws EncoderException,
207 InterruptedException, ExecutionException, IOException, NotFoundException, MediaPackageException,
208 TextAnalyzerException, WorkflowOperationException, ServiceRegistryException {
209 long totalTimeInQueue = 0;
210
211 List<String> sourceTagSet = tagsAndFlavors.getSrcTags();
212 List<String> targetTagSet = tagsAndFlavors.getTargetTags();
213
214
215 Map<Catalog, Mpeg7Catalog> catalogs = loadSegmentCatalogs(mediaPackage, operation, tagsAndFlavors);
216
217
218 if (catalogs.size() == 0) {
219 logger.debug("Mediapackage {} has no suitable mpeg-7 catalogs based on tags {} to to run text analysis",
220 mediaPackage, sourceTagSet);
221 return createResult(mediaPackage, Action.CONTINUE);
222 }
223
224
225 for (Entry<Catalog, Mpeg7Catalog> mapEntry : catalogs.entrySet()) {
226 Map<VideoSegment, Job> jobs = new HashMap<VideoSegment, Job>();
227 List<Attachment> images = new LinkedList<Attachment>();
228 Catalog segmentCatalog = mapEntry.getKey();
229 try {
230 MediaPackageReference catalogRef = segmentCatalog.getReference();
231
232
233 if (catalogRef == null) {
234 logger.info("Skipping catalog {} since we can't determine the source track", segmentCatalog);
235 } else if (mediaPackage.getElementByReference(catalogRef) == null) {
236 logger.info("Skipping catalog {} since we can't determine the source track", segmentCatalog);
237 } else if (!(mediaPackage.getElementByReference(catalogRef) instanceof Track)) {
238 logger.info("Skipping catalog {} since it's source was not a track", segmentCatalog);
239 }
240
241 logger.info("Analyzing mpeg-7 segments catalog {} for text", segmentCatalog);
242
243
244 Mpeg7Catalog textCatalog = mapEntry.getValue().clone();
245 Track sourceTrack = mediaPackage.getTrack(catalogRef.getIdentifier());
246
247
248 Video videoContent = textCatalog.videoContent().next();
249 TemporalDecomposition<? extends Segment> decomposition = videoContent.getTemporalDecomposition();
250 Iterator<? extends Segment> segmentIterator = decomposition.segments();
251
252
253 List<VideoSegment> videoSegments = new LinkedList<VideoSegment>();
254 while (segmentIterator.hasNext()) {
255 Segment segment = segmentIterator.next();
256 if ((segment instanceof VideoSegment)) {
257 videoSegments.add((VideoSegment) segment);
258 }
259 }
260
261
262 double[] times = new double[videoSegments.size()];
263
264 for (int i = 0; i < videoSegments.size(); i++) {
265 VideoSegment videoSegment = videoSegments.get(i);
266 MediaTimePoint segmentTimePoint = videoSegment.getMediaTime().getMediaTimePoint();
267 MediaDuration segmentDuration = videoSegment.getMediaTime().getMediaDuration();
268
269
270 MediaPackageReference reference = null;
271 if (catalogRef == null) {
272 reference = new MediaPackageReferenceImpl();
273 } else {
274 reference = new MediaPackageReferenceImpl(catalogRef.getType(), catalogRef.getIdentifier());
275 }
276 reference.setProperty("time", segmentTimePoint.toString());
277
278
279
280
281 long startTimeSeconds = segmentTimePoint.getTimeInMilliseconds() / 1000;
282 long durationSeconds = segmentDuration.getDurationInMilliseconds() / 1000;
283 times[i] = Math.max(startTimeSeconds + durationSeconds - stabilityThreshold + 1, 0);
284 }
285
286
287
288 Job imageJob = composer.image(sourceTrack, IMAGE_EXTRACTION_PROFILE, times);
289 if (!waitForStatus(imageJob).isSuccess()) {
290 throw new WorkflowOperationException("Extracting scene images from " + sourceTrack + " failed");
291 }
292 if (imageJob.getPayload() == null) {
293 throw new WorkflowOperationException(
294 "The payload of extracting images job from " + sourceTrack + " was null");
295 }
296
297 totalTimeInQueue += imageJob.getQueueTime();
298 for (MediaPackageElement imageMpe : MediaPackageElementParser.getArrayFromXml(imageJob.getPayload())) {
299 Attachment image = (Attachment) imageMpe;
300 images.add(image);
301 }
302 if (images.isEmpty() || images.size() != times.length) {
303 throw new WorkflowOperationException(
304 "There are no images produced for " + sourceTrack
305 + " or the images count isn't equal the count of the video segments.");
306 }
307
308
309 Iterator<VideoSegment> it = videoSegments.iterator();
310 for (MediaPackageElement element : images) {
311 Attachment image = (Attachment) element;
312 VideoSegment videoSegment = it.next();
313 jobs.put(videoSegment, analysisService.extract(image));
314 }
315
316
317 if (!waitForStatus(jobs.values().toArray(new Job[jobs.size()])).isSuccess()) {
318 throw new WorkflowOperationException("Text extraction failed on images from " + sourceTrack);
319 }
320
321
322 for (Map.Entry<VideoSegment, Job> entry : jobs.entrySet()) {
323 Job job = serviceRegistry.getJob(entry.getValue().getId());
324 totalTimeInQueue += job.getQueueTime();
325
326 VideoSegment videoSegment = entry.getKey();
327 MediaDuration segmentDuration = videoSegment.getMediaTime().getMediaDuration();
328 Catalog catalog = (Catalog) MediaPackageElementParser.getFromXml(job.getPayload());
329 if (catalog == null) {
330 logger.warn("Text analysis did not return a valid mpeg7 for segment {}", videoSegment);
331 continue;
332 }
333 Mpeg7Catalog videoTextCatalog = loadMpeg7Catalog(catalog);
334 if (videoTextCatalog == null) {
335 throw new IllegalStateException("Text analysis service did not return a valid mpeg7");
336 }
337
338
339 Iterator<Video> videoTextContents = videoTextCatalog.videoContent();
340 if (videoTextContents == null || !videoTextContents.hasNext()) {
341 logger.debug("Text analysis was not able to extract any text from {}", job.getArguments().get(0));
342 break;
343 }
344
345 try {
346 Video textVideoContent = videoTextContents.next();
347 VideoSegment textVideoSegment = (VideoSegment) textVideoContent.getTemporalDecomposition().segments()
348 .next();
349 VideoText[] videoTexts = textVideoSegment.getSpatioTemporalDecomposition().getVideoText();
350 SpatioTemporalDecomposition std = videoSegment.createSpatioTemporalDecomposition(true, false);
351 for (VideoText videoText : videoTexts) {
352 MediaTime mediaTime = new MediaTimeImpl(new MediaRelTimePointImpl(0), segmentDuration);
353 SpatioTemporalLocator locator = new SpatioTemporalLocatorImpl(mediaTime);
354 videoText.setSpatioTemporalLocator(locator);
355 std.addVideoText(videoText);
356 }
357 } catch (Exception e) {
358 logger.warn("The mpeg-7 structure returned by the text analyzer is not what is expected", e);
359 continue;
360 }
361 }
362
363
364 MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
365 Catalog catalog = (Catalog) builder.newElement(MediaPackageElement.Type.Catalog, MediaPackageElements.TEXTS);
366 catalog.setIdentifier(null);
367 catalog.setReference(segmentCatalog.getReference());
368 mediaPackage.add(catalog);
369 InputStream in = mpeg7CatalogService.serialize(textCatalog);
370 String filename = "slidetext.xml";
371 URI workspaceURI = workspace
372 .put(mediaPackage.getIdentifier().toString(), catalog.getIdentifier(), filename, in);
373 catalog.setURI(workspaceURI);
374
375
376 try {
377 mediaPackage.remove(segmentCatalog);
378 workspace.delete(segmentCatalog.getURI());
379 } catch (Exception e) {
380 logger.warn("Unable to delete segment catalog {}", segmentCatalog.getURI(), e);
381 }
382
383
384 catalog.setFlavor(MediaPackageElements.TEXTS);
385 for (String tag : targetTagSet) {
386 catalog.addTag(tag);
387 }
388 } finally {
389
390 logger.debug("Removing temporary images");
391 for (Attachment image : images) {
392 try {
393 workspace.delete(image.getURI());
394 } catch (Exception e) {
395 logger.warn("Unable to delete temporary image {}", image.getURI(), e);
396 }
397 }
398
399 for (Job j : jobs.values()) {
400 Catalog catalog = null;
401 try {
402 Job job = serviceRegistry.getJob(j.getId());
403 if (!Job.Status.FINISHED.equals(job.getStatus())) {
404 continue;
405 }
406 catalog = (Catalog) MediaPackageElementParser.getFromXml(job.getPayload());
407 if (catalog != null) {
408 workspace.delete(catalog.getURI());
409 }
410 } catch (Exception e) {
411 if (catalog != null) {
412 logger.warn("Unable to delete temporary text file {}", catalog.getURI(), e);
413 } else {
414 logger.warn("Unable to parse textextraction payload of job {}", j.getId());
415 }
416 }
417 }
418 }
419 }
420
421 logger.debug("Text analysis completed");
422 return createResult(mediaPackage, Action.CONTINUE, totalTimeInQueue);
423 }
424
425
426
427
428
429
430
431
432
433
434 protected Mpeg7Catalog loadMpeg7Catalog(Catalog catalog) throws IOException {
435 InputStream in = null;
436 try {
437 File f = workspace.get(catalog.getURI());
438 in = new FileInputStream(f);
439 return mpeg7CatalogService.load(in);
440 } catch (NotFoundException e) {
441 throw new IOException("Unable to open catalog " + catalog + ": " + e.getMessage());
442 } finally {
443 IOUtils.closeQuietly(in);
444 }
445 }
446
447
448
449
450
451
452
453
454
455
456
457
458
459 protected Map<Catalog, Mpeg7Catalog> loadSegmentCatalogs(MediaPackage mediaPackage,
460 WorkflowOperationInstance operation, ConfiguredTagsAndFlavors tagsAndFlavors) throws IOException {
461 HashMap<Catalog, Mpeg7Catalog> catalogs = new HashMap<Catalog, Mpeg7Catalog>();
462
463 List<MediaPackageElementFlavor> sourceFlavors = tagsAndFlavors.getSrcFlavors();
464 List<String> sourceTagSet = tagsAndFlavors.getSrcTags();
465
466 Catalog[] catalogsWithTags = mediaPackage.getCatalogsByTags(sourceTagSet);
467
468 for (Catalog mediaPackageCatalog : catalogsWithTags) {
469 if (!MediaPackageElements.SEGMENTS.equals(mediaPackageCatalog.getFlavor())) {
470 continue;
471 }
472 if (sourceFlavors != null) {
473 if (mediaPackageCatalog.getReference() == null) {
474 continue;
475 }
476 Track t = mediaPackage.getTrack(mediaPackageCatalog.getReference().getIdentifier());
477 if (t == null || sourceFlavors.stream().noneMatch(flavor -> t.getFlavor().matches(flavor))) {
478 continue;
479 }
480 }
481
482
483 if (!mediaPackageCatalog.containsTag(sourceTagSet)) {
484 continue;
485 }
486
487 Mpeg7Catalog mpeg7 = loadMpeg7Catalog(mediaPackageCatalog);
488
489
490 if (mpeg7.videoContent() == null || !mpeg7.videoContent().hasNext()) {
491 logger.debug("Mpeg-7 segments catalog {} does not contain any video content", mpeg7);
492 continue;
493 }
494
495
496 Video videoContent = mpeg7.videoContent().next();
497 TemporalDecomposition<? extends Segment> decomposition = videoContent.getTemporalDecomposition();
498 if (decomposition == null || !decomposition.hasSegments()) {
499 logger.debug("Mpeg-7 catalog {} does not contain a temporal decomposition", mpeg7);
500 continue;
501 }
502 catalogs.put(mediaPackageCatalog, mpeg7);
503 }
504
505 return catalogs;
506 }
507
508
509
510
511 @SuppressWarnings("rawtypes")
512 @Override
513 public void updated(Dictionary properties) throws ConfigurationException {
514 if (properties != null && properties.get(OPT_STABILITY_THRESHOLD) != null) {
515 String threshold = StringUtils.trimToNull((String)properties.get(OPT_STABILITY_THRESHOLD));
516 try {
517 stabilityThreshold = Integer.parseInt(threshold);
518 logger.info("The videosegmenter's stability threshold has been set to {} frames", stabilityThreshold);
519 } catch (Exception e) {
520 stabilityThreshold = DEFAULT_STABILITY_THRESHOLD;
521 logger.warn("Found illegal value '{}' for the videosegmenter stability threshold. "
522 + "Falling back to default value of {} frames", threshold, DEFAULT_STABILITY_THRESHOLD);
523 }
524 } else {
525 stabilityThreshold = DEFAULT_STABILITY_THRESHOLD;
526 logger.info("Using the default value of {} frames for the videosegmenter stability threshold",
527 DEFAULT_STABILITY_THRESHOLD);
528 }
529 }
530
531
532
533
534
535
536 @Reference
537 void setComposerService(ComposerService composerService) {
538 this.composer = composerService;
539 }
540
541 @Reference
542 @Override
543 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
544 super.setServiceRegistry(serviceRegistry);
545 }
546
547 }