View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.handler.textanalyzer;
23  
24  import org.opencastproject.composer.api.ComposerService;
25  import org.opencastproject.composer.api.EncoderException;
26  import org.opencastproject.job.api.Job;
27  import org.opencastproject.job.api.JobContext;
28  import org.opencastproject.mediapackage.Attachment;
29  import org.opencastproject.mediapackage.Catalog;
30  import org.opencastproject.mediapackage.MediaPackage;
31  import org.opencastproject.mediapackage.MediaPackageElement;
32  import org.opencastproject.mediapackage.MediaPackageElementBuilder;
33  import org.opencastproject.mediapackage.MediaPackageElementBuilderFactory;
34  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
35  import org.opencastproject.mediapackage.MediaPackageElementParser;
36  import org.opencastproject.mediapackage.MediaPackageElements;
37  import org.opencastproject.mediapackage.MediaPackageException;
38  import org.opencastproject.mediapackage.MediaPackageReference;
39  import org.opencastproject.mediapackage.MediaPackageReferenceImpl;
40  import org.opencastproject.mediapackage.Track;
41  import org.opencastproject.metadata.mpeg7.MediaDuration;
42  import org.opencastproject.metadata.mpeg7.MediaRelTimePointImpl;
43  import org.opencastproject.metadata.mpeg7.MediaTime;
44  import org.opencastproject.metadata.mpeg7.MediaTimeImpl;
45  import org.opencastproject.metadata.mpeg7.MediaTimePoint;
46  import org.opencastproject.metadata.mpeg7.Mpeg7Catalog;
47  import org.opencastproject.metadata.mpeg7.Mpeg7CatalogService;
48  import org.opencastproject.metadata.mpeg7.Segment;
49  import org.opencastproject.metadata.mpeg7.SpatioTemporalDecomposition;
50  import org.opencastproject.metadata.mpeg7.SpatioTemporalLocator;
51  import org.opencastproject.metadata.mpeg7.SpatioTemporalLocatorImpl;
52  import org.opencastproject.metadata.mpeg7.TemporalDecomposition;
53  import org.opencastproject.metadata.mpeg7.Video;
54  import org.opencastproject.metadata.mpeg7.VideoSegment;
55  import org.opencastproject.metadata.mpeg7.VideoText;
56  import org.opencastproject.serviceregistry.api.ServiceRegistry;
57  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
58  import org.opencastproject.textanalyzer.api.TextAnalyzerException;
59  import org.opencastproject.textanalyzer.api.TextAnalyzerService;
60  import org.opencastproject.util.NotFoundException;
61  import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
62  import org.opencastproject.workflow.api.ConfiguredTagsAndFlavors;
63  import org.opencastproject.workflow.api.WorkflowInstance;
64  import org.opencastproject.workflow.api.WorkflowOperationException;
65  import org.opencastproject.workflow.api.WorkflowOperationHandler;
66  import org.opencastproject.workflow.api.WorkflowOperationInstance;
67  import org.opencastproject.workflow.api.WorkflowOperationResult;
68  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
69  import org.opencastproject.workspace.api.Workspace;
70  
71  import org.apache.commons.io.IOUtils;
72  import org.apache.commons.lang3.StringUtils;
73  import org.osgi.service.cm.ConfigurationException;
74  import org.osgi.service.cm.ManagedService;
75  import org.osgi.service.component.annotations.Component;
76  import org.osgi.service.component.annotations.Reference;
77  import org.slf4j.Logger;
78  import org.slf4j.LoggerFactory;
79  
80  import java.io.File;
81  import java.io.FileInputStream;
82  import java.io.IOException;
83  import java.io.InputStream;
84  import java.net.URI;
85  import java.util.Dictionary;
86  import java.util.HashMap;
87  import java.util.Iterator;
88  import java.util.LinkedList;
89  import java.util.List;
90  import java.util.Map;
91  import java.util.Map.Entry;
92  import java.util.concurrent.ExecutionException;
93  
94  /**
95   * The <code>TextAnalysisOperationHandler</code> will take an <code>MPEG-7</code> catalog, look for video segments and
96   * run a text analysis on the associated still images. The resulting <code>VideoText</code> elements will then be added
97   * to the segments.
98   */
99  @Component(
100     immediate = true,
101     service = { WorkflowOperationHandler.class, ManagedService.class },
102     property = {
103         "service.description=Text Analysis Workflow Operation Handler",
104         "workflow.operation=extract-text"
105     }
106 )
107 public class TextAnalysisWorkflowOperationHandler extends AbstractWorkflowOperationHandler implements ManagedService {
108 
109   /** The logging facility */
110   private static final Logger logger = LoggerFactory.getLogger(TextAnalysisWorkflowOperationHandler.class);
111 
112   /** Name of the encoding profile that extracts a still image from a movie */
113   public static final String IMAGE_EXTRACTION_PROFILE = "text-analysis.http";
114 
115   /** The threshold for scene stability, in seconds */
116   private static final int DEFAULT_STABILITY_THRESHOLD = 5;
117 
118   /** Name of the constant used to retreive the stability threshold */
119   public static final String OPT_STABILITY_THRESHOLD = "stabilitythreshold";
120 
121   /** The stability threshold */
122   private int stabilityThreshold = DEFAULT_STABILITY_THRESHOLD;
123 
124   /** The local workspace */
125   private Workspace workspace = null;
126 
127   /** The mpeg7 catalog service */
128   private Mpeg7CatalogService mpeg7CatalogService = null;
129 
130   /** The text analysis service */
131   private TextAnalyzerService analysisService = null;
132 
133   /** The composer service */
134   protected ComposerService composer = null;
135 
136   /**
137    * Callback for the OSGi declarative services configuration that will set the text analysis service.
138    *
139    * @param analysisService
140    *          the text analysis service
141    */
142   @Reference
143   protected void setTextAnalyzer(TextAnalyzerService analysisService) {
144     this.analysisService = analysisService;
145   }
146 
147   /**
148    * Callback for declarative services configuration that will introduce us to the local workspace service.
149    * Implementation assumes that the reference is configured as being static.
150    *
151    * @param workspace
152    *          an instance of the workspace
153    */
154   @Reference
155   public void setWorkspace(Workspace workspace) {
156     this.workspace = workspace;
157   }
158 
159   /**
160    * Callback for the OSGi declarative services configuration.
161    *
162    * @param catalogService
163    *          the catalog service
164    */
165   @Reference(name = "Mpeg7Service")
166   protected void setMpeg7CatalogService(Mpeg7CatalogService catalogService) {
167     this.mpeg7CatalogService = catalogService;
168   }
169 
170   @Override
171   public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
172           throws WorkflowOperationException {
173     logger.debug("Running segments preview workflow operation on {}", workflowInstance);
174 
175     ConfiguredTagsAndFlavors tagsAndFlavors = getTagsAndFlavors(
176         workflowInstance, Configuration.many, Configuration.many, Configuration.many, Configuration.none);
177 
178     // Check if there is an mpeg-7 catalog containing video segments
179     MediaPackage src = (MediaPackage) workflowInstance.getMediaPackage().clone();
180     Catalog[] segmentCatalogs = src.getCatalogs(MediaPackageElements.SEGMENTS);
181     if (segmentCatalogs.length == 0) {
182       logger.info("Media package {} does not contain segment information", src);
183       return createResult(Action.CONTINUE);
184     }
185 
186     try {
187       return extractVideoText(src, workflowInstance.getCurrentOperation(), tagsAndFlavors);
188     } catch (Exception e) {
189       throw new WorkflowOperationException(e);
190     }
191   }
192 
193   /**
194    * Runs the text analysis service on each of the video segments found.
195    *
196    * @param mediaPackage
197    *          the original mediapackage
198    * @param operation
199    *          the workflow operation
200    * @throws ExecutionException
201    * @throws InterruptedException
202    * @throws NotFoundException
203    * @throws WorkflowOperationException
204    */
205   protected WorkflowOperationResult extractVideoText(final MediaPackage mediaPackage,
206           WorkflowOperationInstance operation, ConfiguredTagsAndFlavors tagsAndFlavors) throws EncoderException,
207           InterruptedException, ExecutionException, IOException, NotFoundException, MediaPackageException,
208           TextAnalyzerException, WorkflowOperationException, ServiceRegistryException {
209     long totalTimeInQueue = 0;
210 
211     List<String> sourceTagSet = tagsAndFlavors.getSrcTags();
212     List<String> targetTagSet = tagsAndFlavors.getTargetTags();
213 
214     // Select the catalogs according to the tags
215     Map<Catalog, Mpeg7Catalog> catalogs = loadSegmentCatalogs(mediaPackage, operation, tagsAndFlavors);
216 
217     // Was there at least one matching catalog
218     if (catalogs.size() == 0) {
219       logger.debug("Mediapackage {} has no suitable mpeg-7 catalogs based on tags {} to to run text analysis",
220               mediaPackage, sourceTagSet);
221       return createResult(mediaPackage, Action.CONTINUE);
222     }
223 
224     // Loop over all existing segment catalogs
225     for (Entry<Catalog, Mpeg7Catalog> mapEntry : catalogs.entrySet()) {
226       Map<VideoSegment, Job> jobs = new HashMap<VideoSegment, Job>();
227       List<Attachment> images = new LinkedList<Attachment>();
228       Catalog segmentCatalog = mapEntry.getKey();
229       try {
230         MediaPackageReference catalogRef = segmentCatalog.getReference();
231 
232         // Make sure we can figure out the source track
233         if (catalogRef == null) {
234           logger.info("Skipping catalog {} since we can't determine the source track", segmentCatalog);
235         } else if (mediaPackage.getElementByReference(catalogRef) == null) {
236           logger.info("Skipping catalog {} since we can't determine the source track", segmentCatalog);
237         } else if (!(mediaPackage.getElementByReference(catalogRef) instanceof Track)) {
238           logger.info("Skipping catalog {} since it's source was not a track", segmentCatalog);
239         }
240 
241         logger.info("Analyzing mpeg-7 segments catalog {} for text", segmentCatalog);
242 
243         // Create a copy that will contain the segments enriched with the video text elements
244         Mpeg7Catalog textCatalog = mapEntry.getValue().clone();
245         Track sourceTrack = mediaPackage.getTrack(catalogRef.getIdentifier());
246 
247         // Load the temporal decomposition (segments)
248         Video videoContent = textCatalog.videoContent().next();
249         TemporalDecomposition<? extends Segment> decomposition = videoContent.getTemporalDecomposition();
250         Iterator<? extends Segment> segmentIterator = decomposition.segments();
251 
252         // For every segment, try to find the still image and run text analysis on it
253         List<VideoSegment> videoSegments = new LinkedList<VideoSegment>();
254         while (segmentIterator.hasNext()) {
255           Segment segment = segmentIterator.next();
256           if ((segment instanceof VideoSegment)) {
257             videoSegments.add((VideoSegment) segment);
258           }
259         }
260 
261         // argument array for image extraction
262         double[] times = new double[videoSegments.size()];
263 
264         for (int i = 0; i < videoSegments.size(); i++) {
265           VideoSegment videoSegment = videoSegments.get(i);
266           MediaTimePoint segmentTimePoint = videoSegment.getMediaTime().getMediaTimePoint();
267           MediaDuration segmentDuration = videoSegment.getMediaTime().getMediaDuration();
268 
269           // Choose a time
270           MediaPackageReference reference = null;
271           if (catalogRef == null) {
272             reference = new MediaPackageReferenceImpl();
273           } else {
274             reference = new MediaPackageReferenceImpl(catalogRef.getType(), catalogRef.getIdentifier());
275           }
276           reference.setProperty("time", segmentTimePoint.toString());
277 
278           // Have the time for ocr image created. To circumvent problems with slowly building slides, we take the image
279           // that is
280           // almost at the end of the segment, it should contain the most content and is stable as well.
281           long startTimeSeconds = segmentTimePoint.getTimeInMilliseconds() / 1000;
282           long durationSeconds = segmentDuration.getDurationInMilliseconds() / 1000;
283           times[i] = Math.max(startTimeSeconds + durationSeconds - stabilityThreshold + 1, 0);
284         }
285 
286         // Have the ocr image(s) created.
287 
288         Job imageJob = composer.image(sourceTrack, IMAGE_EXTRACTION_PROFILE, times);
289         if (!waitForStatus(imageJob).isSuccess()) {
290           throw new WorkflowOperationException("Extracting scene images from " + sourceTrack + " failed");
291         }
292         if (imageJob.getPayload() == null) {
293           throw new WorkflowOperationException(
294                   "The payload of extracting images job from " + sourceTrack + " was null");
295         }
296 
297         totalTimeInQueue += imageJob.getQueueTime();
298         for (MediaPackageElement imageMpe : MediaPackageElementParser.getArrayFromXml(imageJob.getPayload())) {
299           Attachment image = (Attachment) imageMpe;
300           images.add(image);
301         }
302         if (images.isEmpty() || images.size() != times.length) {
303           throw new WorkflowOperationException(
304                   "There are no images produced for " + sourceTrack
305                           + " or the images count isn't equal the count of the video segments.");
306         }
307 
308         // Run text extraction on each of the images
309         Iterator<VideoSegment> it = videoSegments.iterator();
310         for (MediaPackageElement element : images) {
311           Attachment image = (Attachment) element;
312           VideoSegment videoSegment = it.next();
313           jobs.put(videoSegment, analysisService.extract(image));
314         }
315 
316         // Wait for all jobs to be finished
317         if (!waitForStatus(jobs.values().toArray(new Job[jobs.size()])).isSuccess()) {
318           throw new WorkflowOperationException("Text extraction failed on images from " + sourceTrack);
319         }
320 
321         // Process the text extraction results
322         for (Map.Entry<VideoSegment, Job> entry : jobs.entrySet()) {
323           Job job = serviceRegistry.getJob(entry.getValue().getId());
324           totalTimeInQueue += job.getQueueTime();
325 
326           VideoSegment videoSegment = entry.getKey();
327           MediaDuration segmentDuration = videoSegment.getMediaTime().getMediaDuration();
328           Catalog catalog = (Catalog) MediaPackageElementParser.getFromXml(job.getPayload());
329           if (catalog == null) {
330             logger.warn("Text analysis did not return a valid mpeg7 for segment {}", videoSegment);
331             continue;
332           }
333           Mpeg7Catalog videoTextCatalog = loadMpeg7Catalog(catalog);
334           if (videoTextCatalog == null) {
335             throw new IllegalStateException("Text analysis service did not return a valid mpeg7");
336           }
337 
338           // Add the spatiotemporal decompositions from the new catalog to the existing video segments
339           Iterator<Video> videoTextContents = videoTextCatalog.videoContent();
340           if (videoTextContents == null || !videoTextContents.hasNext()) {
341             logger.debug("Text analysis was not able to extract any text from {}", job.getArguments().get(0));
342             break;
343           }
344 
345           try {
346             Video textVideoContent = videoTextContents.next();
347             VideoSegment textVideoSegment = (VideoSegment) textVideoContent.getTemporalDecomposition().segments()
348                     .next();
349             VideoText[] videoTexts = textVideoSegment.getSpatioTemporalDecomposition().getVideoText();
350             SpatioTemporalDecomposition std = videoSegment.createSpatioTemporalDecomposition(true, false);
351             for (VideoText videoText : videoTexts) {
352               MediaTime mediaTime = new MediaTimeImpl(new MediaRelTimePointImpl(0), segmentDuration);
353               SpatioTemporalLocator locator = new SpatioTemporalLocatorImpl(mediaTime);
354               videoText.setSpatioTemporalLocator(locator);
355               std.addVideoText(videoText);
356             }
357           } catch (Exception e) {
358             logger.warn("The mpeg-7 structure returned by the text analyzer is not what is expected", e);
359             continue;
360           }
361         }
362 
363         // Put the catalog into the workspace and add it to the media package
364         MediaPackageElementBuilder builder = MediaPackageElementBuilderFactory.newInstance().newElementBuilder();
365         Catalog catalog = (Catalog) builder.newElement(MediaPackageElement.Type.Catalog, MediaPackageElements.TEXTS);
366         catalog.setIdentifier(null);
367         catalog.setReference(segmentCatalog.getReference());
368         mediaPackage.add(catalog); // the catalog now has an ID, so we can store the file properly
369         InputStream in = mpeg7CatalogService.serialize(textCatalog);
370         String filename = "slidetext.xml";
371         URI workspaceURI = workspace
372                 .put(mediaPackage.getIdentifier().toString(), catalog.getIdentifier(), filename, in);
373         catalog.setURI(workspaceURI);
374 
375         // Since we've enriched and stored the mpeg7 catalog, remove the original
376         try {
377           mediaPackage.remove(segmentCatalog);
378           workspace.delete(segmentCatalog.getURI());
379         } catch (Exception e) {
380           logger.warn("Unable to delete segment catalog {}", segmentCatalog.getURI(), e);
381         }
382 
383         // Add flavor and target tags
384         catalog.setFlavor(MediaPackageElements.TEXTS);
385         for (String tag : targetTagSet) {
386           catalog.addTag(tag);
387         }
388       } finally {
389         // Remove images that were created for text extraction
390         logger.debug("Removing temporary images");
391         for (Attachment image : images) {
392           try {
393             workspace.delete(image.getURI());
394           } catch (Exception e) {
395             logger.warn("Unable to delete temporary image {}", image.getURI(), e);
396           }
397         }
398         // Remove the temporary text
399         for (Job j : jobs.values()) {
400           Catalog catalog = null;
401           try {
402             Job job = serviceRegistry.getJob(j.getId());
403             if (!Job.Status.FINISHED.equals(job.getStatus())) {
404               continue;
405             }
406             catalog = (Catalog) MediaPackageElementParser.getFromXml(job.getPayload());
407             if (catalog != null) {
408               workspace.delete(catalog.getURI());
409             }
410           } catch (Exception e) {
411             if (catalog != null) {
412               logger.warn("Unable to delete temporary text file {}", catalog.getURI(), e);
413             } else {
414               logger.warn("Unable to parse textextraction payload of job {}", j.getId());
415             }
416           }
417         }
418       }
419     }
420 
421     logger.debug("Text analysis completed");
422     return createResult(mediaPackage, Action.CONTINUE, totalTimeInQueue);
423   }
424 
425   /**
426    * Loads an mpeg7 catalog from a mediapackage's catalog reference
427    *
428    * @param catalog
429    *          the mediapackage's reference to this catalog
430    * @return the mpeg7
431    * @throws IOException
432    *           if there is a problem loading or parsing the mpeg7 object
433    */
434   protected Mpeg7Catalog loadMpeg7Catalog(Catalog catalog) throws IOException {
435     InputStream in = null;
436     try {
437       File f = workspace.get(catalog.getURI());
438       in = new FileInputStream(f);
439       return mpeg7CatalogService.load(in);
440     } catch (NotFoundException e) {
441       throw new IOException("Unable to open catalog " + catalog + ": " + e.getMessage());
442     } finally {
443       IOUtils.closeQuietly(in);
444     }
445   }
446 
447   /**
448    * Extracts the catalogs from the media package that match the requirements of flavor and tags specified in the
449    * operation handler.
450    *
451    * @param mediaPackage
452    *          the media package
453    * @param operation
454    *          the workflow operation
455    * @return a map of catalog elements and their mpeg-7 representations
456    * @throws IOException
457    *           if there is a problem reading the mpeg7
458    */
459   protected Map<Catalog, Mpeg7Catalog> loadSegmentCatalogs(MediaPackage mediaPackage,
460           WorkflowOperationInstance operation, ConfiguredTagsAndFlavors tagsAndFlavors) throws IOException {
461     HashMap<Catalog, Mpeg7Catalog> catalogs = new HashMap<Catalog, Mpeg7Catalog>();
462 
463     List<MediaPackageElementFlavor> sourceFlavors = tagsAndFlavors.getSrcFlavors();
464     List<String> sourceTagSet = tagsAndFlavors.getSrcTags();
465 
466     Catalog[] catalogsWithTags = mediaPackage.getCatalogsByTags(sourceTagSet);
467 
468     for (Catalog mediaPackageCatalog : catalogsWithTags) {
469       if (!MediaPackageElements.SEGMENTS.equals(mediaPackageCatalog.getFlavor())) {
470         continue;
471       }
472       if (sourceFlavors != null) {
473         if (mediaPackageCatalog.getReference() == null) {
474           continue;
475         }
476         Track t = mediaPackage.getTrack(mediaPackageCatalog.getReference().getIdentifier());
477         if (t == null || sourceFlavors.stream().noneMatch(flavor -> t.getFlavor().matches(flavor))) {
478           continue;
479         }
480       }
481 
482       // Make sure the catalog features at least one of the required tags
483       if (!mediaPackageCatalog.containsTag(sourceTagSet)) {
484         continue;
485       }
486 
487       Mpeg7Catalog mpeg7 = loadMpeg7Catalog(mediaPackageCatalog);
488 
489       // Make sure there is video content
490       if (mpeg7.videoContent() == null || !mpeg7.videoContent().hasNext()) {
491         logger.debug("Mpeg-7 segments catalog {} does not contain any video content", mpeg7);
492         continue;
493       }
494 
495       // Make sure there is a temporal decomposition
496       Video videoContent = mpeg7.videoContent().next();
497       TemporalDecomposition<? extends Segment> decomposition = videoContent.getTemporalDecomposition();
498       if (decomposition == null || !decomposition.hasSegments()) {
499         logger.debug("Mpeg-7 catalog {} does not contain a temporal decomposition", mpeg7);
500         continue;
501       }
502       catalogs.put(mediaPackageCatalog, mpeg7);
503     }
504 
505     return catalogs;
506   }
507 
508   /**
509    * @see org.osgi.service.cm.ManagedService#updated(java.util.Dictionary)
510    */
511   @SuppressWarnings("rawtypes")
512   @Override
513   public void updated(Dictionary properties) throws ConfigurationException {
514     if (properties != null && properties.get(OPT_STABILITY_THRESHOLD) != null) {
515       String threshold = StringUtils.trimToNull((String)properties.get(OPT_STABILITY_THRESHOLD));
516       try {
517         stabilityThreshold = Integer.parseInt(threshold);
518         logger.info("The videosegmenter's stability threshold has been set to {} frames", stabilityThreshold);
519       } catch (Exception e) {
520         stabilityThreshold = DEFAULT_STABILITY_THRESHOLD;
521         logger.warn("Found illegal value '{}' for the videosegmenter stability threshold. "
522             + "Falling back to default value of {} frames", threshold, DEFAULT_STABILITY_THRESHOLD);
523       }
524     } else {
525       stabilityThreshold = DEFAULT_STABILITY_THRESHOLD;
526       logger.info("Using the default value of {} frames for the videosegmenter stability threshold",
527           DEFAULT_STABILITY_THRESHOLD);
528     }
529   }
530 
531   /**
532    * Sets the composer service.
533    *
534    * @param composerService
535    */
536   @Reference
537   void setComposerService(ComposerService composerService) {
538     this.composer = composerService;
539   }
540 
541   @Reference
542   @Override
543   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
544     super.setServiceRegistry(serviceRegistry);
545   }
546 
547 }