View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.ingest.scanner;
23  
24  import static java.lang.String.format;
25  import static org.opencastproject.scheduler.api.RecordingState.UPLOAD_FINISHED;
26  
27  import org.opencastproject.ingest.api.IngestService;
28  import org.opencastproject.mediapackage.Catalog;
29  import org.opencastproject.mediapackage.MediaPackage;
30  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
31  import org.opencastproject.mediapackage.MediaPackageElements;
32  import org.opencastproject.mediapackage.MediaPackageException;
33  import org.opencastproject.mediapackage.Track;
34  import org.opencastproject.mediapackage.identifier.IdImpl;
35  import org.opencastproject.metadata.dublincore.DublinCore;
36  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
37  import org.opencastproject.metadata.dublincore.DublinCoreUtil;
38  import org.opencastproject.metadata.dublincore.DublinCores;
39  import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
40  import org.opencastproject.metadata.dublincore.Precision;
41  import org.opencastproject.scheduler.api.Recording;
42  import org.opencastproject.scheduler.api.SchedulerService;
43  import org.opencastproject.security.util.SecurityContext;
44  import org.opencastproject.series.api.SeriesService;
45  import org.opencastproject.util.IoSupport;
46  import org.opencastproject.util.NotFoundException;
47  import org.opencastproject.workflow.api.WorkflowInstance;
48  import org.opencastproject.workspace.api.Workspace;
49  
50  import com.google.common.util.concurrent.RateLimiter;
51  import com.google.gson.Gson;
52  import com.google.gson.annotations.SerializedName;
53  
54  import org.apache.commons.io.FileUtils;
55  import org.apache.commons.io.FilenameUtils;
56  import org.apache.commons.io.IOUtils;
57  import org.apache.commons.lang3.StringUtils;
58  import org.apache.commons.lang3.time.DateUtils;
59  import org.slf4j.Logger;
60  import org.slf4j.LoggerFactory;
61  
62  import java.io.ByteArrayInputStream;
63  import java.io.ByteArrayOutputStream;
64  import java.io.File;
65  import java.io.FileInputStream;
66  import java.io.IOException;
67  import java.io.InputStream;
68  import java.net.URI;
69  import java.nio.charset.StandardCharsets;
70  import java.sql.Timestamp;
71  import java.text.DateFormat;
72  import java.text.ParseException;
73  import java.text.SimpleDateFormat;
74  import java.time.Instant;
75  import java.time.LocalDateTime;
76  import java.time.format.DateTimeFormatter;
77  import java.time.format.DateTimeParseException;
78  import java.util.ArrayList;
79  import java.util.Date;
80  import java.util.Map;
81  import java.util.Optional;
82  import java.util.concurrent.Callable;
83  import java.util.concurrent.CompletionService;
84  import java.util.concurrent.ExecutionException;
85  import java.util.concurrent.ExecutorCompletionService;
86  import java.util.concurrent.ExecutorService;
87  import java.util.concurrent.Executors;
88  import java.util.concurrent.Future;
89  import java.util.regex.Pattern;
90  import java.util.stream.Collectors;
91  
92  /** Used by the {@link InboxScannerService} to do the actual ingest. */
93  public class Ingestor implements Runnable {
94  
95    /**
96     * The logger
97     */
98    private static final Logger logger = LoggerFactory.getLogger(Ingestor.class);
99  
100   private final IngestService ingestService;
101 
102   private final SecurityContext secCtx;
103 
104   private final String workflowDefinition;
105 
106   private final Map<String, String> workflowConfig;
107 
108   private final MediaPackageElementFlavor mediaFlavor;
109 
110   private final File inbox;
111 
112   private final SeriesService seriesService;
113   private final SchedulerService schedulerService;
114   private final Workspace workspace;
115 
116   private final int maxTries;
117 
118   private final int secondsBetweenTries;
119 
120   private RateLimiter throttle = RateLimiter.create(1.0);
121 
122   private final Optional<Pattern> metadataPattern;
123   private final DateTimeFormatter dateFormatter;
124   private final String ffprobe;
125 
126   private final Gson gson = new Gson();
127 
128   private final boolean matchSchedule;
129   private final float matchThreshold;
130 
131   /**
132    * Thread pool to run the ingest worker.
133    */
134   private final ExecutorService executorService;
135 
136   /**
137    * Completion service to manage internal completion queue of ingest jobs including retries
138    */
139   private final CompletionService<RetriableIngestJob> completionService;
140 
141 
142   private class RetriableIngestJob implements Callable<RetriableIngestJob> {
143     private final File artifact;
144     private int retryCount;
145     private boolean failed;
146     private RateLimiter throttle;
147 
148     RetriableIngestJob(final File artifact, int secondsBetweenTries) {
149       this.artifact = artifact;
150       this.retryCount = 0;
151       this.failed = false;
152       throttle = RateLimiter.create(1.0 / secondsBetweenTries);
153     }
154 
155     public boolean hasFailed() {
156       return this.failed;
157     }
158 
159     public int getRetryCount() {
160       return this.retryCount;
161     }
162 
163     public File getArtifact() {
164       return this.artifact;
165     }
166 
167     @Override
168     public RetriableIngestJob call() {
169       return secCtx.runInContext(() -> {
170           if (hasFailed()) {
171             logger.warn("This is retry number {} for file {}. We will wait for {} seconds before trying again",
172                     retryCount, artifact.getName(), secondsBetweenTries);
173             throttle.acquire();
174           }
175           try (InputStream in = new FileInputStream(artifact)) {
176             failed = false;
177             ++retryCount;
178             if ("zip".equalsIgnoreCase(FilenameUtils.getExtension(artifact.getName()))) {
179               logger.info("Start ingest inbox file {} as a zipped mediapackage", artifact.getName());
180               WorkflowInstance workflowInstance = ingestService.addZippedMediaPackage(in, workflowDefinition, workflowConfig);
181               logger.info("Ingested {} as a zipped mediapackage from inbox as {}. Started workflow {}.",
182                       artifact.getName(), workflowInstance.getMediaPackage().getIdentifier().toString(),
183                       workflowInstance.getId());
184             } else {
185               /* Create MediaPackage and add Track */
186               logger.info("Start ingest track from file {}", artifact.getName());
187 
188               // Try extracting metadata from the file name and path
189               String title = artifact.getName();
190               String spatial = null;
191               Date created = null;
192               Float duration = null;
193               if (metadataPattern.isPresent()) {
194                 var matcher = metadataPattern.get().matcher(artifact.getName());
195                 if (matcher.find()) {
196                   try {
197                     title = matcher.group("title");
198                   } catch (IllegalArgumentException e) {
199                     logger.debug("{} matches no 'title' in {}", metadataPattern.get(), artifact.getName(), e);
200                   }
201                   try {
202                     spatial = matcher.group("spatial");
203                   } catch (IllegalArgumentException e) {
204                     logger.debug("{} matches no 'spatial' in {}", metadataPattern.get(), artifact.getName(), e);
205                   }
206                   try {
207                     var value = matcher.group("created");
208                     logger.debug("Trying to parse matched date '{}' with formatter {}", value, dateFormatter);
209                     created = Timestamp.valueOf(LocalDateTime.parse(value, dateFormatter));
210                   } catch (DateTimeParseException e) {
211                     logger.warn("Matched date does not match configured date-time format", e);
212                   } catch (IllegalArgumentException e) {
213                     logger.debug("{} matches no 'created' in {}", metadataPattern.get(), artifact.getName(), e);
214                   }
215                 } else {
216                   logger.debug("Regular expression {} does not match {}", metadataPattern.get(), artifact.getName());
217                 }
218               }
219 
220               // Try extracting additional metadata via ffprobe
221               if (ffprobe != null) {
222                 JsonFormat json = probeMedia(artifact.getAbsolutePath()).format;
223                 created = json.tags.getCreationTime() == null ? created : json.tags.getCreationTime();
224                 duration = json.getDuration();
225                 logger.debug("Extracted metadata from file: {}", json);
226               }
227 
228               MediaPackage mediaPackage = null;
229               var currentWorkflowDefinition = workflowDefinition;
230               var currentWorkflowConfig = workflowConfig;
231 
232               // Check if we can match this to a scheduled event
233               if (matchSchedule && spatial != null && created != null) {
234                 logger.debug("Try finding scheduled event for agent {} at time {}", spatial, created);
235                 var end = duration == null ? created : DateUtils.addSeconds(created, duration.intValue());
236                 var mediaPackages = schedulerService.findConflictingEvents(spatial, created, end);
237                 if (matchThreshold > 0F && mediaPackages.size() > 1) {
238                   var filteredMediaPackages = new ArrayList<MediaPackage>();
239                   for (var mp : mediaPackages) {
240                     var schedule =  schedulerService.getTechnicalMetadata(mp.getIdentifier().toString());
241                     if (overlap(schedule.getStartDate(), schedule.getEndDate(), created, end) > matchThreshold) {
242                       filteredMediaPackages.add(mp);
243                     }
244                   }
245                   mediaPackages = filteredMediaPackages;
246                 }
247                 if (mediaPackages.size() > 1) {
248                   logger.warn("Metadata match multiple events. Not using any!");
249                 } else if (mediaPackages.size() == 1) {
250                   mediaPackage = mediaPackages.get(0);
251                   var id = mediaPackage.getIdentifier().toString();
252                   var eventConfiguration = schedulerService.getCaptureAgentConfiguration(id);
253 
254                   // Check if the scheduled event already has a recording associated with it
255                   // If so, ingest the file as a new event
256                   try {
257                     Recording recordingState = schedulerService.getRecordingState(id);
258                     if (recordingState.getState().equals(UPLOAD_FINISHED)) {
259                       var referenceId = mediaPackage.getIdentifier().toString();
260                       mediaPackage = (MediaPackage) mediaPackage.clone();
261                       mediaPackage.setIdentifier(IdImpl.fromUUID());
262 
263                       // Drop copied media files. We don't want them in the new event
264                       for (Track track : mediaPackage.getTracks()) {
265                         logger.info("Remove track: " + track);
266                         mediaPackage.remove(track);
267                       }
268 
269                       // Update dublincore title and set reference to originally scheduled event
270                       try {
271                         DublinCoreCatalog dc = DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage).get();
272                         var newTitle = dc.get(DublinCore.PROPERTY_TITLE).get(0).getValue()
273                                 + " (" + Instant.now().getEpochSecond() + ")";
274                         dc.set(DublinCore.PROPERTY_TITLE, newTitle);
275                         dc.set(DublinCore.PROPERTY_REFERENCES, referenceId);
276                         mediaPackage = updateDublincCoreCatalog(mediaPackage, dc);
277                         mediaPackage.setTitle(newTitle);
278                       } catch (Exception e) {
279                         // Don't fail the ingest if we could not set metadata for some reason
280                       }
281                     }
282                   } catch (NotFoundException e) {
283                     // Occurs if a scheduled event has not started yet
284                   }
285 
286                   currentWorkflowDefinition = eventConfiguration.getOrDefault(
287                           "org.opencastproject.workflow.definition",
288                           workflowDefinition);
289                   currentWorkflowConfig = eventConfiguration.entrySet().stream()
290                           .filter(e -> e.getKey().startsWith("org.opencastproject.workflow.config."))
291                           .collect(Collectors.toMap(e -> e.getKey().substring(36), Map.Entry::getValue));
292                   schedulerService.updateRecordingState(id, UPLOAD_FINISHED);
293                   logger.info("Found matching scheduled event {}", mediaPackage);
294                 } else {
295                   logger.debug("No matching event found.");
296                 }
297               }
298 
299               // create new media package and metadata catalog if we have none
300               if (mediaPackage == null) {
301                 // create new media package
302                 mediaPackage = ingestService.createMediaPackage();
303 
304                 DublinCoreCatalog dcc = DublinCores.mkOpencastEpisode().getCatalog();
305                 if (spatial != null) {
306                   dcc.add(DublinCore.PROPERTY_SPATIAL, spatial);
307                 }
308                 if (created != null) {
309                   dcc.add(DublinCore.PROPERTY_CREATED, EncodingSchemeUtils.encodeDate(created, Precision.Second));
310                 }
311                 // fall back to filename for title if matcher did not catch any
312                 dcc.add(DublinCore.PROPERTY_TITLE, title);
313 
314                 /* Check if we have a subdir and if its name matches an existing series */
315                 final File dir = artifact.getParentFile();
316                 if (FileUtils.directoryContains(inbox, dir)) {
317                   /* cut away inbox path and trailing slash from artifact path */
318                   var seriesID = dir.getName();
319                   if (seriesService.getSeries(seriesID) != null) {
320                     logger.info("Ingest from inbox into series with id {}", seriesID);
321                     dcc.add(DublinCore.PROPERTY_IS_PART_OF, seriesID);
322                   }
323                 }
324 
325                 try (ByteArrayOutputStream dcout = new ByteArrayOutputStream()) {
326                   dcc.toXml(dcout, true);
327                   try (InputStream dcin = new ByteArrayInputStream(dcout.toByteArray())) {
328                     mediaPackage = ingestService.addCatalog(dcin, "dublincore.xml", MediaPackageElements.EPISODE,
329                             mediaPackage);
330                     logger.info("Added DC catalog to media package for ingest from inbox");
331                   }
332                 }
333               }
334 
335               // Ingest media
336               mediaPackage = ingestService.addTrack(in, artifact.getName(), mediaFlavor, mediaPackage);
337               logger.info("Ingested track from file {} to media package {}",
338                       artifact.getName(), mediaPackage.getIdentifier().toString());
339 
340               // Ingest media package
341               WorkflowInstance workflowInstance = ingestService.ingest(mediaPackage, currentWorkflowDefinition,
342                       currentWorkflowConfig);
343               logger.info("Ingested {} from inbox, workflow {} started", artifact.getName(), workflowInstance.getId());
344             }
345           } catch (Exception e) {
346             logger.error("Error ingesting inbox file {}", artifact.getName(), e);
347             failed = true;
348             return RetriableIngestJob.this;
349           }
350           try {
351             FileUtils.forceDelete(artifact);
352           } catch (IOException e) {
353             logger.error("Unable to delete file {}", artifact.getAbsolutePath(), e);
354           }
355           return RetriableIngestJob.this;
356       });
357     }
358 
359     private JsonFFprobe probeMedia(final String file) throws IOException {
360 
361       final String[] command = new String[] {
362               ffprobe,
363               "-show_format",
364               "-of",
365               "json",
366               file
367       };
368 
369       // Execute ffprobe and obtain the result
370       logger.debug("Running ffprobe: {}", (Object) command);
371 
372       String output;
373       Process process = null;
374       try {
375         process = new ProcessBuilder(command)
376             .redirectError(ProcessBuilder.Redirect.DISCARD)
377             .start();
378 
379         try (InputStream in = process.getInputStream()) {
380           output = IOUtils.toString(in, StandardCharsets.UTF_8);
381         }
382 
383         if (process.waitFor() != 0) {
384           throw new IOException("FFprobe exited abnormally");
385         }
386       } catch (InterruptedException e) {
387         throw new IOException(e);
388       } finally {
389         IoSupport.closeQuietly(process);
390       }
391 
392       return gson.fromJson(output, JsonFFprobe.class);
393     }
394 
395     /**
396      * Calculate the overlap of two events `a` and `b`.
397      * @param aStart Begin of event a
398      * @param aEnd End of event a
399      * @param bStart Begin of event b
400      * @param bEnd End of event b
401      * @return How much of `a` overlaps with `n`. Return a float in the range of <pre>[0.0, 1.0]</pre>.
402      */
403     private float overlap(Date aStart, Date aEnd, Date bStart, Date bEnd) {
404       var min = Math.min(aStart.getTime(), bStart.getTime());
405       var max = Math.max(aEnd.getTime(), bEnd.getTime());
406       var aLen = aEnd.getTime() - aStart.getTime();
407       var bLen = bEnd.getTime() - bStart.getTime();
408       var overlap =  aLen + bLen - (max - min);
409       logger.debug("Detected overlap of {} ({})", overlap, overlap / (float) aLen);
410       if (aLen == 0F) {
411         return 1F;
412       }
413       if (overlap > 0F) {
414         return overlap / (float) aLen;
415       }
416       return 0.0F;
417     }
418   }
419 
420   @Override
421   public void run() {
422     while (true) {
423       try {
424         final Future<RetriableIngestJob> f = completionService.take();
425         final RetriableIngestJob task = f.get();
426         if (task.hasFailed()) {
427           if (task.getRetryCount() < maxTries) {
428             throttle.acquire();
429             logger.warn("Retrying inbox ingest of {}", task.getArtifact().getAbsolutePath());
430             completionService.submit(task);
431           } else {
432             logger.error("Inbox ingest failed after {} tries for {}", maxTries, task.getArtifact().getAbsolutePath());
433           }
434         }
435       } catch (InterruptedException e) {
436         logger.debug("Ingestor check interrupted", e);
437         return;
438       } catch (ExecutionException e) {
439         logger.error("Ingestor check interrupted", e);
440       }
441     }
442   }
443 
444   /**
445    * Create new ingestor.
446    *
447    * @param ingestService         media packages are passed to the ingest service
448    * @param secCtx                security context needed for ingesting with the IngestService or for putting files into the working file
449    *                              repository
450    * @param workflowDefinition    workflow to apply to ingested media packages
451    * @param workflowConfig        the workflow definition configuration
452    * @param mediaFlavor           media flavor to use by default
453    * @param inbox                 inbox directory to watch
454    * @param maxThreads            maximum worker threads doing the actual ingest
455    * @param seriesService         reference to the active series service
456    * @param maxTries              maximum tries for a ingest job
457    * @param secondsBetweenTries   time between retires in seconds
458    * @param metadataPattern       regular expression pattern for matching metadata in file names
459    * @param dateFormatter         date formatter pattern for parsing temporal metadata
460    */
461   public Ingestor(IngestService ingestService, SecurityContext secCtx,
462           String workflowDefinition, Map<String, String> workflowConfig, String mediaFlavor, File inbox, int maxThreads,
463           SeriesService seriesService, int maxTries, int secondsBetweenTries, Optional<Pattern> metadataPattern,
464           DateTimeFormatter dateFormatter, SchedulerService schedulerService, String ffprobe, boolean matchSchedule,
465           float matchThreshold, Workspace workspace) {
466     this.ingestService = ingestService;
467     this.secCtx = secCtx;
468     this.workflowDefinition = workflowDefinition;
469     this.workflowConfig = workflowConfig;
470     this.mediaFlavor = MediaPackageElementFlavor.parseFlavor(mediaFlavor);
471     this.inbox = inbox;
472     this.executorService = Executors.newFixedThreadPool(maxThreads);
473     this.completionService = new ExecutorCompletionService<>(executorService);
474     this.seriesService = seriesService;
475     this.maxTries = maxTries;
476     this.secondsBetweenTries = secondsBetweenTries;
477     this.metadataPattern = metadataPattern;
478     this.dateFormatter = dateFormatter;
479     this.schedulerService = schedulerService;
480     this.ffprobe = ffprobe;
481     this.matchSchedule = matchSchedule;
482     this.matchThreshold = matchThreshold;
483     this.workspace = workspace;
484   }
485 
486   /**
487    * Asynchronous ingest of an artifact.
488    */
489   public void ingest(final File artifact) {
490     logger.info("Try ingest of file {}", artifact.getName());
491     completionService.submit(new RetriableIngestJob(artifact, secondsBetweenTries));
492   }
493 
494   /**
495    * Return true if the passed artifact can be handled by this ingestor,
496    * false if not (e.g. it lies outside of inbox or its name starts with a ".")
497    */
498   public boolean canHandle(final File artifact) {
499     logger.trace("canHandle() {}, {}", myInfo(), artifact.getAbsolutePath());
500     File dir = artifact.getParentFile();
501     try {
502       /* Stop if dir is empty, stop if artifact is dotfile, stop if artifact lives outside of inbox path */
503       return dir != null && !artifact.getName().startsWith(".")
504               && FileUtils.directoryContains(inbox, artifact)
505               && artifact.canRead() && artifact.length() > 0;
506     } catch (IOException e) {
507       logger.warn("Unable to determine canonical path of {}", artifact.getAbsolutePath(), e);
508       return false;
509     }
510   }
511 
512   public void cleanup(final File artifact) {
513     try {
514       File parentDir = artifact.getParentFile();
515       if (FileUtils.directoryContains(inbox, parentDir)) {
516         String[] filesList = parentDir.list();
517         if (filesList == null || filesList.length == 0) {
518           logger.info("Delete empty inbox for series {}",
519                   StringUtils.substring(parentDir.getCanonicalPath(), inbox.getCanonicalPath().length() + 1));
520           FileUtils.deleteDirectory(parentDir);
521         }
522       }
523     } catch (Exception e) {
524       logger.error("Unable to cleanup inbox for the artifact {}", artifact, e);
525     }
526   }
527 
528   /**
529    *
530    * @param mp
531    *          the mediapackage to update
532    * @param dc
533    *          the dublincore metadata to use to update the mediapackage
534    * @return the updated mediapackage
535    * @throws IOException
536    *           Thrown if an IO error occurred adding the dc catalog file
537    * @throws MediaPackageException
538    *           Thrown if an error occurred updating the mediapackage or the mediapackage does not contain a catalog
539    */
540   private MediaPackage updateDublincCoreCatalog(MediaPackage mp, DublinCoreCatalog dc)
541           throws IOException, MediaPackageException {
542     try (InputStream inputStream = IOUtils.toInputStream(dc.toXmlString(), "UTF-8")) {
543       // Update dublincore catalog
544       Catalog[] catalogs = mp.getCatalogs(MediaPackageElements.EPISODE);
545       if (catalogs.length > 0) {
546         Catalog catalog = catalogs[0];
547         URI uri = workspace.put(mp.getIdentifier().toString(), catalog.getIdentifier(), "dublincore.xml", inputStream);
548         catalog.setURI(uri);
549         // setting the URI to a new source so the checksum will most like be invalid
550         catalog.setChecksum(null);
551       } else {
552         throw new MediaPackageException("Unable to find catalog");
553       }
554     }
555     return mp;
556   }
557 
558   public String myInfo() {
559     return format("[%x thread=%x]", hashCode(), Thread.currentThread().getId());
560   }
561 
562   class JsonFFprobe {
563     protected JsonFormat format;
564   }
565 
566   class JsonFormat {
567     private String duration;
568     protected JsonTags tags;
569 
570     Float getDuration() {
571       return duration == null ? null : Float.parseFloat(duration);
572     }
573 
574     @Override
575     public String toString() {
576       return String.format("{duration=%s,tags=%s}", duration, tags);
577     }
578   }
579 
580   class JsonTags {
581     @SerializedName(value = "creation_time")
582     private String creationTime;
583 
584     Date getCreationTime() throws ParseException {
585       if (creationTime == null) {
586         return  null;
587       }
588       DateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSz");
589       return format.parse(creationTime.replaceAll("000Z$", "+0000"));
590     }
591 
592     @Override
593     public String toString() {
594       return String.format("{creation_time=%s}", creationTime);
595     }
596   }
597 }