View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.ingest.scanner;
23  
24  import static java.lang.String.format;
25  import static org.opencastproject.scheduler.api.RecordingState.UPLOAD_FINISHED;
26  
27  import org.opencastproject.ingest.api.IngestService;
28  import org.opencastproject.mediapackage.Catalog;
29  import org.opencastproject.mediapackage.MediaPackage;
30  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
31  import org.opencastproject.mediapackage.MediaPackageElements;
32  import org.opencastproject.mediapackage.MediaPackageException;
33  import org.opencastproject.mediapackage.Track;
34  import org.opencastproject.mediapackage.identifier.IdImpl;
35  import org.opencastproject.metadata.dublincore.DublinCore;
36  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
37  import org.opencastproject.metadata.dublincore.DublinCoreUtil;
38  import org.opencastproject.metadata.dublincore.DublinCores;
39  import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
40  import org.opencastproject.metadata.dublincore.Precision;
41  import org.opencastproject.scheduler.api.Recording;
42  import org.opencastproject.scheduler.api.SchedulerService;
43  import org.opencastproject.security.util.SecurityContext;
44  import org.opencastproject.series.api.SeriesService;
45  import org.opencastproject.util.IoSupport;
46  import org.opencastproject.util.NotFoundException;
47  import org.opencastproject.workflow.api.WorkflowInstance;
48  import org.opencastproject.workspace.api.Workspace;
49  
50  import com.google.common.util.concurrent.RateLimiter;
51  import com.google.gson.Gson;
52  import com.google.gson.annotations.SerializedName;
53  
54  import org.apache.commons.io.FileUtils;
55  import org.apache.commons.io.FilenameUtils;
56  import org.apache.commons.io.IOUtils;
57  import org.apache.commons.lang3.StringUtils;
58  import org.apache.commons.lang3.time.DateUtils;
59  import org.slf4j.Logger;
60  import org.slf4j.LoggerFactory;
61  
62  import java.io.ByteArrayInputStream;
63  import java.io.ByteArrayOutputStream;
64  import java.io.File;
65  import java.io.FileInputStream;
66  import java.io.IOException;
67  import java.io.InputStream;
68  import java.net.URI;
69  import java.nio.charset.StandardCharsets;
70  import java.sql.Timestamp;
71  import java.text.DateFormat;
72  import java.text.ParseException;
73  import java.text.SimpleDateFormat;
74  import java.time.Instant;
75  import java.time.LocalDateTime;
76  import java.time.format.DateTimeFormatter;
77  import java.time.format.DateTimeParseException;
78  import java.util.ArrayList;
79  import java.util.Date;
80  import java.util.Map;
81  import java.util.Optional;
82  import java.util.concurrent.Callable;
83  import java.util.concurrent.CompletionService;
84  import java.util.concurrent.ExecutionException;
85  import java.util.concurrent.ExecutorCompletionService;
86  import java.util.concurrent.ExecutorService;
87  import java.util.concurrent.Executors;
88  import java.util.concurrent.Future;
89  import java.util.regex.Pattern;
90  import java.util.stream.Collectors;
91  
92  /** Used by the {@link InboxScannerService} to do the actual ingest. */
93  public class Ingestor implements Runnable {
94  
95    /**
96     * The logger
97     */
98    private static final Logger logger = LoggerFactory.getLogger(Ingestor.class);
99  
100   private final IngestService ingestService;
101 
102   private final SecurityContext secCtx;
103 
104   private final String workflowDefinition;
105 
106   private final Map<String, String> workflowConfig;
107 
108   private final MediaPackageElementFlavor mediaFlavor;
109 
110   private final File inbox;
111 
112   private final SeriesService seriesService;
113   private final SchedulerService schedulerService;
114   private final Workspace workspace;
115 
116   private final int maxTries;
117 
118   private final int secondsBetweenTries;
119 
120   private RateLimiter throttle = RateLimiter.create(1.0);
121 
122   private final Optional<Pattern> metadataPattern;
123   private final DateTimeFormatter dateFormatter;
124   private final String ffprobe;
125 
126   private final Gson gson = new Gson();
127 
128   private final boolean matchSchedule;
129   private final float matchThreshold;
130 
131   /**
132    * Thread pool to run the ingest worker.
133    */
134   private final ExecutorService executorService;
135 
136   /**
137    * Completion service to manage internal completion queue of ingest jobs including retries
138    */
139   private final CompletionService<RetriableIngestJob> completionService;
140 
141 
142   private class RetriableIngestJob implements Callable<RetriableIngestJob> {
143     private final File artifact;
144     private int retryCount;
145     private boolean failed;
146     private RateLimiter throttle;
147 
148     RetriableIngestJob(final File artifact, int secondsBetweenTries) {
149       this.artifact = artifact;
150       this.retryCount = 0;
151       this.failed = false;
152       throttle = RateLimiter.create(1.0 / secondsBetweenTries);
153     }
154 
155     public boolean hasFailed() {
156       return this.failed;
157     }
158 
159     public int getRetryCount() {
160       return this.retryCount;
161     }
162 
163     public File getArtifact() {
164       return this.artifact;
165     }
166 
167     @Override
168     public RetriableIngestJob call() {
169       return secCtx.runInContext(() -> {
170         if (hasFailed()) {
171           logger.warn("This is retry number {} for file {}. We will wait for {} seconds before trying again",
172                   retryCount, artifact.getName(), secondsBetweenTries);
173           throttle.acquire();
174         }
175         try (InputStream in = new FileInputStream(artifact)) {
176           failed = false;
177           ++retryCount;
178           if ("zip".equalsIgnoreCase(FilenameUtils.getExtension(artifact.getName()))) {
179             logger.info("Start ingest inbox file {} as a zipped mediapackage", artifact.getName());
180             WorkflowInstance workflowInstance = ingestService.addZippedMediaPackage(in, workflowDefinition,
181                 workflowConfig);
182             logger.info("Ingested {} as a zipped mediapackage from inbox as {}. Started workflow {}.",
183                     artifact.getName(), workflowInstance.getMediaPackage().getIdentifier().toString(),
184                     workflowInstance.getId());
185           } else {
186             /* Create MediaPackage and add Track */
187             logger.info("Start ingest track from file {}", artifact.getName());
188 
189             // Try extracting metadata from the file name and path
190             String title = artifact.getName();
191             String spatial = null;
192             Date created = null;
193             Float duration = null;
194             if (metadataPattern.isPresent()) {
195               var matcher = metadataPattern.get().matcher(artifact.getName());
196               if (matcher.find()) {
197                 try {
198                   title = matcher.group("title");
199                 } catch (IllegalArgumentException e) {
200                   logger.debug("{} matches no 'title' in {}", metadataPattern.get(), artifact.getName(), e);
201                 }
202                 try {
203                   spatial = matcher.group("spatial");
204                 } catch (IllegalArgumentException e) {
205                   logger.debug("{} matches no 'spatial' in {}", metadataPattern.get(), artifact.getName(), e);
206                 }
207                 try {
208                   var value = matcher.group("created");
209                   logger.debug("Trying to parse matched date '{}' with formatter {}", value, dateFormatter);
210                   created = Timestamp.valueOf(LocalDateTime.parse(value, dateFormatter));
211                 } catch (DateTimeParseException e) {
212                   logger.warn("Matched date does not match configured date-time format", e);
213                 } catch (IllegalArgumentException e) {
214                   logger.debug("{} matches no 'created' in {}", metadataPattern.get(), artifact.getName(), e);
215                 }
216               } else {
217                 logger.debug("Regular expression {} does not match {}", metadataPattern.get(), artifact.getName());
218               }
219             }
220 
221             // Try extracting additional metadata via ffprobe
222             if (ffprobe != null) {
223               JsonFormat json = probeMedia(artifact.getAbsolutePath()).format;
224               created = json.tags.getCreationTime() == null ? created : json.tags.getCreationTime();
225               duration = json.getDuration();
226               logger.debug("Extracted metadata from file: {}", json);
227             }
228 
229             MediaPackage mediaPackage = null;
230             var currentWorkflowDefinition = workflowDefinition;
231             var currentWorkflowConfig = workflowConfig;
232 
233             // Check if we can match this to a scheduled event
234             if (matchSchedule && spatial != null && created != null) {
235               logger.debug("Try finding scheduled event for agent {} at time {}", spatial, created);
236               var end = duration == null ? created : DateUtils.addSeconds(created, duration.intValue());
237               var mediaPackages = schedulerService.findConflictingEvents(spatial, created, end);
238               if (matchThreshold > 0F && mediaPackages.size() > 1) {
239                 var filteredMediaPackages = new ArrayList<MediaPackage>();
240                 for (var mp : mediaPackages) {
241                   var schedule =  schedulerService.getTechnicalMetadata(mp.getIdentifier().toString());
242                   if (overlap(schedule.getStartDate(), schedule.getEndDate(), created, end) > matchThreshold) {
243                     filteredMediaPackages.add(mp);
244                   }
245                 }
246                 mediaPackages = filteredMediaPackages;
247               }
248               if (mediaPackages.size() > 1) {
249                 logger.warn("Metadata match multiple events. Not using any!");
250               } else if (mediaPackages.size() == 1) {
251                 mediaPackage = mediaPackages.get(0);
252                 var id = mediaPackage.getIdentifier().toString();
253                 var eventConfiguration = schedulerService.getCaptureAgentConfiguration(id);
254 
255                 // Check if the scheduled event already has a recording associated with it
256                 // If so, ingest the file as a new event
257                 try {
258                   Recording recordingState = schedulerService.getRecordingState(id);
259                   if (recordingState.getState().equals(UPLOAD_FINISHED)) {
260                     var referenceId = mediaPackage.getIdentifier().toString();
261                     mediaPackage = (MediaPackage) mediaPackage.clone();
262                     mediaPackage.setIdentifier(IdImpl.fromUUID());
263 
264                     // Drop copied media files. We don't want them in the new event
265                     for (Track track : mediaPackage.getTracks()) {
266                       logger.info("Remove track: " + track);
267                       mediaPackage.remove(track);
268                     }
269 
270                     // Update dublincore title and set reference to originally scheduled event
271                     try {
272                       DublinCoreCatalog dc = DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage).get();
273                       var newTitle = dc.get(DublinCore.PROPERTY_TITLE).get(0).getValue()
274                               + " (" + Instant.now().getEpochSecond() + ")";
275                       dc.set(DublinCore.PROPERTY_TITLE, newTitle);
276                       dc.set(DublinCore.PROPERTY_REFERENCES, referenceId);
277                       mediaPackage = updateDublincCoreCatalog(mediaPackage, dc);
278                       mediaPackage.setTitle(newTitle);
279                     } catch (Exception e) {
280                       // Don't fail the ingest if we could not set metadata for some reason
281                     }
282                   }
283                 } catch (NotFoundException e) {
284                   // Occurs if a scheduled event has not started yet
285                 }
286 
287                 currentWorkflowDefinition = eventConfiguration.getOrDefault(
288                         "org.opencastproject.workflow.definition",
289                         workflowDefinition);
290                 currentWorkflowConfig = eventConfiguration.entrySet().stream()
291                         .filter(e -> e.getKey().startsWith("org.opencastproject.workflow.config."))
292                         .collect(Collectors.toMap(e -> e.getKey().substring(36), Map.Entry::getValue));
293                 schedulerService.updateRecordingState(id, UPLOAD_FINISHED);
294                 logger.info("Found matching scheduled event {}", mediaPackage);
295               } else {
296                 logger.debug("No matching event found.");
297               }
298             }
299 
300             // create new media package and metadata catalog if we have none
301             if (mediaPackage == null) {
302               // create new media package
303               mediaPackage = ingestService.createMediaPackage();
304 
305               DublinCoreCatalog dcc = DublinCores.mkOpencastEpisode().getCatalog();
306               if (spatial != null) {
307                 dcc.add(DublinCore.PROPERTY_SPATIAL, spatial);
308               }
309               if (created != null) {
310                 dcc.add(DublinCore.PROPERTY_CREATED, EncodingSchemeUtils.encodeDate(created, Precision.Second));
311               }
312               // fall back to filename for title if matcher did not catch any
313               dcc.add(DublinCore.PROPERTY_TITLE, title);
314 
315               /* Check if we have a subdir and if its name matches an existing series */
316               final File dir = artifact.getParentFile();
317               if (FileUtils.directoryContains(inbox, dir)) {
318                 /* cut away inbox path and trailing slash from artifact path */
319                 var seriesID = dir.getName();
320                 if (seriesService.getSeries(seriesID) != null) {
321                   logger.info("Ingest from inbox into series with id {}", seriesID);
322                   dcc.add(DublinCore.PROPERTY_IS_PART_OF, seriesID);
323                 }
324               }
325 
326               try (ByteArrayOutputStream dcout = new ByteArrayOutputStream()) {
327                 dcc.toXml(dcout, true);
328                 try (InputStream dcin = new ByteArrayInputStream(dcout.toByteArray())) {
329                   mediaPackage = ingestService.addCatalog(dcin, "dublincore.xml", MediaPackageElements.EPISODE,
330                           mediaPackage);
331                   logger.info("Added DC catalog to media package for ingest from inbox");
332                 }
333               }
334             }
335 
336             // Ingest media
337             mediaPackage = ingestService.addTrack(in, artifact.getName(), mediaFlavor, mediaPackage);
338             logger.info("Ingested track from file {} to media package {}",
339                     artifact.getName(), mediaPackage.getIdentifier().toString());
340 
341             // Ingest media package
342             WorkflowInstance workflowInstance = ingestService.ingest(mediaPackage, currentWorkflowDefinition,
343                     currentWorkflowConfig);
344             logger.info("Ingested {} from inbox, workflow {} started", artifact.getName(), workflowInstance.getId());
345           }
346         } catch (Exception e) {
347           logger.error("Error ingesting inbox file {}", artifact.getName(), e);
348           failed = true;
349           return RetriableIngestJob.this;
350         }
351         try {
352           FileUtils.forceDelete(artifact);
353         } catch (IOException e) {
354           logger.error("Unable to delete file {}", artifact.getAbsolutePath(), e);
355         }
356         return RetriableIngestJob.this;
357       });
358     }
359 
360     private JsonFFprobe probeMedia(final String file) throws IOException {
361 
362       final String[] command = new String[] {
363           ffprobe,
364           "-show_format",
365           "-of",
366           "json",
367           file
368       };
369 
370       // Execute ffprobe and obtain the result
371       logger.debug("Running ffprobe: {}", (Object) command);
372 
373       String output;
374       Process process = null;
375       try {
376         process = new ProcessBuilder(command)
377             .redirectError(ProcessBuilder.Redirect.DISCARD)
378             .start();
379 
380         try (InputStream in = process.getInputStream()) {
381           output = IOUtils.toString(in, StandardCharsets.UTF_8);
382         }
383 
384         if (process.waitFor() != 0) {
385           throw new IOException("FFprobe exited abnormally");
386         }
387       } catch (InterruptedException e) {
388         throw new IOException(e);
389       } finally {
390         IoSupport.closeQuietly(process);
391       }
392 
393       return gson.fromJson(output, JsonFFprobe.class);
394     }
395 
396     /**
397      * Calculate the overlap of two events `a` and `b`.
398      * @param aStart Begin of event a
399      * @param aEnd End of event a
400      * @param bStart Begin of event b
401      * @param bEnd End of event b
402      * @return How much of `a` overlaps with `n`. Return a float in the range of <pre>[0.0, 1.0]</pre>.
403      */
404     private float overlap(Date aStart, Date aEnd, Date bStart, Date bEnd) {
405       var min = Math.min(aStart.getTime(), bStart.getTime());
406       var max = Math.max(aEnd.getTime(), bEnd.getTime());
407       var aLen = aEnd.getTime() - aStart.getTime();
408       var bLen = bEnd.getTime() - bStart.getTime();
409       var overlap =  aLen + bLen - (max - min);
410       logger.debug("Detected overlap of {} ({})", overlap, overlap / (float) aLen);
411       if (aLen == 0F) {
412         return 1F;
413       }
414       if (overlap > 0F) {
415         return overlap / (float) aLen;
416       }
417       return 0.0F;
418     }
419   }
420 
421   @Override
422   public void run() {
423     while (true) {
424       try {
425         final Future<RetriableIngestJob> f = completionService.take();
426         final RetriableIngestJob task = f.get();
427         if (task.hasFailed()) {
428           if (task.getRetryCount() < maxTries) {
429             throttle.acquire();
430             logger.warn("Retrying inbox ingest of {}", task.getArtifact().getAbsolutePath());
431             completionService.submit(task);
432           } else {
433             logger.error("Inbox ingest failed after {} tries for {}", maxTries, task.getArtifact().getAbsolutePath());
434           }
435         }
436       } catch (InterruptedException e) {
437         logger.debug("Ingestor check interrupted", e);
438         return;
439       } catch (ExecutionException e) {
440         logger.error("Ingestor check interrupted", e);
441       }
442     }
443   }
444 
445   /**
446    * Create new ingestor.
447    *
448    * @param ingestService         media packages are passed to the ingest service
449    * @param secCtx                security context needed for ingesting with the IngestService or for putting files
450    *                              into the working file repository
451    * @param workflowDefinition    workflow to apply to ingested media packages
452    * @param workflowConfig        the workflow definition configuration
453    * @param mediaFlavor           media flavor to use by default
454    * @param inbox                 inbox directory to watch
455    * @param maxThreads            maximum worker threads doing the actual ingest
456    * @param seriesService         reference to the active series service
457    * @param maxTries              maximum tries for a ingest job
458    * @param secondsBetweenTries   time between retires in seconds
459    * @param metadataPattern       regular expression pattern for matching metadata in file names
460    * @param dateFormatter         date formatter pattern for parsing temporal metadata
461    */
462   public Ingestor(IngestService ingestService, SecurityContext secCtx,
463           String workflowDefinition, Map<String, String> workflowConfig, String mediaFlavor, File inbox, int maxThreads,
464           SeriesService seriesService, int maxTries, int secondsBetweenTries, Optional<Pattern> metadataPattern,
465           DateTimeFormatter dateFormatter, SchedulerService schedulerService, String ffprobe, boolean matchSchedule,
466           float matchThreshold, Workspace workspace) {
467     this.ingestService = ingestService;
468     this.secCtx = secCtx;
469     this.workflowDefinition = workflowDefinition;
470     this.workflowConfig = workflowConfig;
471     this.mediaFlavor = MediaPackageElementFlavor.parseFlavor(mediaFlavor);
472     this.inbox = inbox;
473     this.executorService = Executors.newFixedThreadPool(maxThreads);
474     this.completionService = new ExecutorCompletionService<>(executorService);
475     this.seriesService = seriesService;
476     this.maxTries = maxTries;
477     this.secondsBetweenTries = secondsBetweenTries;
478     this.metadataPattern = metadataPattern;
479     this.dateFormatter = dateFormatter;
480     this.schedulerService = schedulerService;
481     this.ffprobe = ffprobe;
482     this.matchSchedule = matchSchedule;
483     this.matchThreshold = matchThreshold;
484     this.workspace = workspace;
485   }
486 
487   /**
488    * Asynchronous ingest of an artifact.
489    */
490   public void ingest(final File artifact) {
491     logger.info("Try ingest of file {}", artifact.getName());
492     completionService.submit(new RetriableIngestJob(artifact, secondsBetweenTries));
493   }
494 
495   /**
496    * Return true if the passed artifact can be handled by this ingestor,
497    * false if not (e.g. it lies outside of inbox or its name starts with a ".")
498    */
499   public boolean canHandle(final File artifact) {
500     logger.trace("canHandle() {}, {}", myInfo(), artifact.getAbsolutePath());
501     File dir = artifact.getParentFile();
502     try {
503       /* Stop if dir is empty, stop if artifact is dotfile, stop if artifact lives outside of inbox path */
504       return dir != null && !artifact.getName().startsWith(".")
505               && FileUtils.directoryContains(inbox, artifact)
506               && artifact.canRead() && artifact.length() > 0;
507     } catch (IOException e) {
508       logger.warn("Unable to determine canonical path of {}", artifact.getAbsolutePath(), e);
509       return false;
510     }
511   }
512 
513   public void cleanup(final File artifact) {
514     try {
515       File parentDir = artifact.getParentFile();
516       if (FileUtils.directoryContains(inbox, parentDir)) {
517         String[] filesList = parentDir.list();
518         if (filesList == null || filesList.length == 0) {
519           logger.info("Delete empty inbox for series {}",
520                   StringUtils.substring(parentDir.getCanonicalPath(), inbox.getCanonicalPath().length() + 1));
521           FileUtils.deleteDirectory(parentDir);
522         }
523       }
524     } catch (Exception e) {
525       logger.error("Unable to cleanup inbox for the artifact {}", artifact, e);
526     }
527   }
528 
529   /**
530    *
531    * @param mp
532    *          the mediapackage to update
533    * @param dc
534    *          the dublincore metadata to use to update the mediapackage
535    * @return the updated mediapackage
536    * @throws IOException
537    *           Thrown if an IO error occurred adding the dc catalog file
538    * @throws MediaPackageException
539    *           Thrown if an error occurred updating the mediapackage or the mediapackage does not contain a catalog
540    */
541   private MediaPackage updateDublincCoreCatalog(MediaPackage mp, DublinCoreCatalog dc)
542           throws IOException, MediaPackageException {
543     try (InputStream inputStream = IOUtils.toInputStream(dc.toXmlString(), "UTF-8")) {
544       // Update dublincore catalog
545       Catalog[] catalogs = mp.getCatalogs(MediaPackageElements.EPISODE);
546       if (catalogs.length > 0) {
547         Catalog catalog = catalogs[0];
548         URI uri = workspace.put(mp.getIdentifier().toString(), catalog.getIdentifier(), "dublincore.xml", inputStream);
549         catalog.setURI(uri);
550         // setting the URI to a new source so the checksum will most like be invalid
551         catalog.setChecksum(null);
552       } else {
553         throw new MediaPackageException("Unable to find catalog");
554       }
555     }
556     return mp;
557   }
558 
559   public String myInfo() {
560     return format("[%x thread=%x]", hashCode(), Thread.currentThread().getId());
561   }
562 
563   class JsonFFprobe {
564     protected JsonFormat format;
565   }
566 
567   class JsonFormat {
568     private String duration;
569     protected JsonTags tags;
570 
571     Float getDuration() {
572       return duration == null ? null : Float.parseFloat(duration);
573     }
574 
575     @Override
576     public String toString() {
577       return String.format("{duration=%s,tags=%s}", duration, tags);
578     }
579   }
580 
581   class JsonTags {
582     @SerializedName(value = "creation_time")
583     private String creationTime;
584 
585     Date getCreationTime() throws ParseException {
586       if (creationTime == null) {
587         return  null;
588       }
589       DateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSz");
590       return format.parse(creationTime.replaceAll("000Z$", "+0000"));
591     }
592 
593     @Override
594     public String toString() {
595       return String.format("{creation_time=%s}", creationTime);
596     }
597   }
598 }