1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.ingest.scanner;
23
24 import static java.lang.String.format;
25 import static org.opencastproject.scheduler.api.RecordingState.UPLOAD_FINISHED;
26
27 import org.opencastproject.ingest.api.IngestService;
28 import org.opencastproject.mediapackage.Catalog;
29 import org.opencastproject.mediapackage.MediaPackage;
30 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
31 import org.opencastproject.mediapackage.MediaPackageElements;
32 import org.opencastproject.mediapackage.MediaPackageException;
33 import org.opencastproject.mediapackage.Track;
34 import org.opencastproject.mediapackage.identifier.IdImpl;
35 import org.opencastproject.metadata.dublincore.DublinCore;
36 import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
37 import org.opencastproject.metadata.dublincore.DublinCoreUtil;
38 import org.opencastproject.metadata.dublincore.DublinCores;
39 import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
40 import org.opencastproject.metadata.dublincore.Precision;
41 import org.opencastproject.scheduler.api.Recording;
42 import org.opencastproject.scheduler.api.SchedulerService;
43 import org.opencastproject.security.util.SecurityContext;
44 import org.opencastproject.series.api.SeriesService;
45 import org.opencastproject.util.IoSupport;
46 import org.opencastproject.util.NotFoundException;
47 import org.opencastproject.workflow.api.WorkflowInstance;
48 import org.opencastproject.workspace.api.Workspace;
49
50 import com.google.common.util.concurrent.RateLimiter;
51 import com.google.gson.Gson;
52 import com.google.gson.annotations.SerializedName;
53
54 import org.apache.commons.io.FileUtils;
55 import org.apache.commons.io.FilenameUtils;
56 import org.apache.commons.io.IOUtils;
57 import org.apache.commons.lang3.StringUtils;
58 import org.apache.commons.lang3.time.DateUtils;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 import java.io.ByteArrayInputStream;
63 import java.io.ByteArrayOutputStream;
64 import java.io.File;
65 import java.io.FileInputStream;
66 import java.io.IOException;
67 import java.io.InputStream;
68 import java.net.URI;
69 import java.nio.charset.StandardCharsets;
70 import java.sql.Timestamp;
71 import java.text.DateFormat;
72 import java.text.ParseException;
73 import java.text.SimpleDateFormat;
74 import java.time.Instant;
75 import java.time.LocalDateTime;
76 import java.time.format.DateTimeFormatter;
77 import java.time.format.DateTimeParseException;
78 import java.util.ArrayList;
79 import java.util.Date;
80 import java.util.Map;
81 import java.util.Optional;
82 import java.util.concurrent.Callable;
83 import java.util.concurrent.CompletionService;
84 import java.util.concurrent.ExecutionException;
85 import java.util.concurrent.ExecutorCompletionService;
86 import java.util.concurrent.ExecutorService;
87 import java.util.concurrent.Executors;
88 import java.util.concurrent.Future;
89 import java.util.regex.Pattern;
90 import java.util.stream.Collectors;
91
92
93 public class Ingestor implements Runnable {
94
95
96
97
98 private static final Logger logger = LoggerFactory.getLogger(Ingestor.class);
99
100 private final IngestService ingestService;
101
102 private final SecurityContext secCtx;
103
104 private final String workflowDefinition;
105
106 private final Map<String, String> workflowConfig;
107
108 private final MediaPackageElementFlavor mediaFlavor;
109
110 private final File inbox;
111
112 private final SeriesService seriesService;
113 private final SchedulerService schedulerService;
114 private final Workspace workspace;
115
116 private final int maxTries;
117
118 private final int secondsBetweenTries;
119
120 private RateLimiter throttle = RateLimiter.create(1.0);
121
122 private final Optional<Pattern> metadataPattern;
123 private final DateTimeFormatter dateFormatter;
124 private final String ffprobe;
125
126 private final Gson gson = new Gson();
127
128 private final boolean matchSchedule;
129 private final float matchThreshold;
130
131
132
133
134 private final ExecutorService executorService;
135
136
137
138
139 private final CompletionService<RetriableIngestJob> completionService;
140
141
142 private class RetriableIngestJob implements Callable<RetriableIngestJob> {
143 private final File artifact;
144 private int retryCount;
145 private boolean failed;
146 private RateLimiter throttle;
147
148 RetriableIngestJob(final File artifact, int secondsBetweenTries) {
149 this.artifact = artifact;
150 this.retryCount = 0;
151 this.failed = false;
152 throttle = RateLimiter.create(1.0 / secondsBetweenTries);
153 }
154
155 public boolean hasFailed() {
156 return this.failed;
157 }
158
159 public int getRetryCount() {
160 return this.retryCount;
161 }
162
163 public File getArtifact() {
164 return this.artifact;
165 }
166
167 @Override
168 public RetriableIngestJob call() {
169 return secCtx.runInContext(() -> {
170 if (hasFailed()) {
171 logger.warn("This is retry number {} for file {}. We will wait for {} seconds before trying again",
172 retryCount, artifact.getName(), secondsBetweenTries);
173 throttle.acquire();
174 }
175 try (InputStream in = new FileInputStream(artifact)) {
176 failed = false;
177 ++retryCount;
178 if ("zip".equalsIgnoreCase(FilenameUtils.getExtension(artifact.getName()))) {
179 logger.info("Start ingest inbox file {} as a zipped mediapackage", artifact.getName());
180 WorkflowInstance workflowInstance = ingestService.addZippedMediaPackage(in, workflowDefinition, workflowConfig);
181 logger.info("Ingested {} as a zipped mediapackage from inbox as {}. Started workflow {}.",
182 artifact.getName(), workflowInstance.getMediaPackage().getIdentifier().toString(),
183 workflowInstance.getId());
184 } else {
185
186 logger.info("Start ingest track from file {}", artifact.getName());
187
188
189 String title = artifact.getName();
190 String spatial = null;
191 Date created = null;
192 Float duration = null;
193 if (metadataPattern.isPresent()) {
194 var matcher = metadataPattern.get().matcher(artifact.getName());
195 if (matcher.find()) {
196 try {
197 title = matcher.group("title");
198 } catch (IllegalArgumentException e) {
199 logger.debug("{} matches no 'title' in {}", metadataPattern.get(), artifact.getName(), e);
200 }
201 try {
202 spatial = matcher.group("spatial");
203 } catch (IllegalArgumentException e) {
204 logger.debug("{} matches no 'spatial' in {}", metadataPattern.get(), artifact.getName(), e);
205 }
206 try {
207 var value = matcher.group("created");
208 logger.debug("Trying to parse matched date '{}' with formatter {}", value, dateFormatter);
209 created = Timestamp.valueOf(LocalDateTime.parse(value, dateFormatter));
210 } catch (DateTimeParseException e) {
211 logger.warn("Matched date does not match configured date-time format", e);
212 } catch (IllegalArgumentException e) {
213 logger.debug("{} matches no 'created' in {}", metadataPattern.get(), artifact.getName(), e);
214 }
215 } else {
216 logger.debug("Regular expression {} does not match {}", metadataPattern.get(), artifact.getName());
217 }
218 }
219
220
221 if (ffprobe != null) {
222 JsonFormat json = probeMedia(artifact.getAbsolutePath()).format;
223 created = json.tags.getCreationTime() == null ? created : json.tags.getCreationTime();
224 duration = json.getDuration();
225 logger.debug("Extracted metadata from file: {}", json);
226 }
227
228 MediaPackage mediaPackage = null;
229 var currentWorkflowDefinition = workflowDefinition;
230 var currentWorkflowConfig = workflowConfig;
231
232
233 if (matchSchedule && spatial != null && created != null) {
234 logger.debug("Try finding scheduled event for agent {} at time {}", spatial, created);
235 var end = duration == null ? created : DateUtils.addSeconds(created, duration.intValue());
236 var mediaPackages = schedulerService.findConflictingEvents(spatial, created, end);
237 if (matchThreshold > 0F && mediaPackages.size() > 1) {
238 var filteredMediaPackages = new ArrayList<MediaPackage>();
239 for (var mp : mediaPackages) {
240 var schedule = schedulerService.getTechnicalMetadata(mp.getIdentifier().toString());
241 if (overlap(schedule.getStartDate(), schedule.getEndDate(), created, end) > matchThreshold) {
242 filteredMediaPackages.add(mp);
243 }
244 }
245 mediaPackages = filteredMediaPackages;
246 }
247 if (mediaPackages.size() > 1) {
248 logger.warn("Metadata match multiple events. Not using any!");
249 } else if (mediaPackages.size() == 1) {
250 mediaPackage = mediaPackages.get(0);
251 var id = mediaPackage.getIdentifier().toString();
252 var eventConfiguration = schedulerService.getCaptureAgentConfiguration(id);
253
254
255
256 try {
257 Recording recordingState = schedulerService.getRecordingState(id);
258 if (recordingState.getState().equals(UPLOAD_FINISHED)) {
259 var referenceId = mediaPackage.getIdentifier().toString();
260 mediaPackage = (MediaPackage) mediaPackage.clone();
261 mediaPackage.setIdentifier(IdImpl.fromUUID());
262
263
264 for (Track track : mediaPackage.getTracks()) {
265 logger.info("Remove track: " + track);
266 mediaPackage.remove(track);
267 }
268
269
270 try {
271 DublinCoreCatalog dc = DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage).get();
272 var newTitle = dc.get(DublinCore.PROPERTY_TITLE).get(0).getValue()
273 + " (" + Instant.now().getEpochSecond() + ")";
274 dc.set(DublinCore.PROPERTY_TITLE, newTitle);
275 dc.set(DublinCore.PROPERTY_REFERENCES, referenceId);
276 mediaPackage = updateDublincCoreCatalog(mediaPackage, dc);
277 mediaPackage.setTitle(newTitle);
278 } catch (Exception e) {
279
280 }
281 }
282 } catch (NotFoundException e) {
283
284 }
285
286 currentWorkflowDefinition = eventConfiguration.getOrDefault(
287 "org.opencastproject.workflow.definition",
288 workflowDefinition);
289 currentWorkflowConfig = eventConfiguration.entrySet().stream()
290 .filter(e -> e.getKey().startsWith("org.opencastproject.workflow.config."))
291 .collect(Collectors.toMap(e -> e.getKey().substring(36), Map.Entry::getValue));
292 schedulerService.updateRecordingState(id, UPLOAD_FINISHED);
293 logger.info("Found matching scheduled event {}", mediaPackage);
294 } else {
295 logger.debug("No matching event found.");
296 }
297 }
298
299
300 if (mediaPackage == null) {
301
302 mediaPackage = ingestService.createMediaPackage();
303
304 DublinCoreCatalog dcc = DublinCores.mkOpencastEpisode().getCatalog();
305 if (spatial != null) {
306 dcc.add(DublinCore.PROPERTY_SPATIAL, spatial);
307 }
308 if (created != null) {
309 dcc.add(DublinCore.PROPERTY_CREATED, EncodingSchemeUtils.encodeDate(created, Precision.Second));
310 }
311
312 dcc.add(DublinCore.PROPERTY_TITLE, title);
313
314
315 final File dir = artifact.getParentFile();
316 if (FileUtils.directoryContains(inbox, dir)) {
317
318 var seriesID = dir.getName();
319 if (seriesService.getSeries(seriesID) != null) {
320 logger.info("Ingest from inbox into series with id {}", seriesID);
321 dcc.add(DublinCore.PROPERTY_IS_PART_OF, seriesID);
322 }
323 }
324
325 try (ByteArrayOutputStream dcout = new ByteArrayOutputStream()) {
326 dcc.toXml(dcout, true);
327 try (InputStream dcin = new ByteArrayInputStream(dcout.toByteArray())) {
328 mediaPackage = ingestService.addCatalog(dcin, "dublincore.xml", MediaPackageElements.EPISODE,
329 mediaPackage);
330 logger.info("Added DC catalog to media package for ingest from inbox");
331 }
332 }
333 }
334
335
336 mediaPackage = ingestService.addTrack(in, artifact.getName(), mediaFlavor, mediaPackage);
337 logger.info("Ingested track from file {} to media package {}",
338 artifact.getName(), mediaPackage.getIdentifier().toString());
339
340
341 WorkflowInstance workflowInstance = ingestService.ingest(mediaPackage, currentWorkflowDefinition,
342 currentWorkflowConfig);
343 logger.info("Ingested {} from inbox, workflow {} started", artifact.getName(), workflowInstance.getId());
344 }
345 } catch (Exception e) {
346 logger.error("Error ingesting inbox file {}", artifact.getName(), e);
347 failed = true;
348 return RetriableIngestJob.this;
349 }
350 try {
351 FileUtils.forceDelete(artifact);
352 } catch (IOException e) {
353 logger.error("Unable to delete file {}", artifact.getAbsolutePath(), e);
354 }
355 return RetriableIngestJob.this;
356 });
357 }
358
359 private JsonFFprobe probeMedia(final String file) throws IOException {
360
361 final String[] command = new String[] {
362 ffprobe,
363 "-show_format",
364 "-of",
365 "json",
366 file
367 };
368
369
370 logger.debug("Running ffprobe: {}", (Object) command);
371
372 String output;
373 Process process = null;
374 try {
375 process = new ProcessBuilder(command)
376 .redirectError(ProcessBuilder.Redirect.DISCARD)
377 .start();
378
379 try (InputStream in = process.getInputStream()) {
380 output = IOUtils.toString(in, StandardCharsets.UTF_8);
381 }
382
383 if (process.waitFor() != 0) {
384 throw new IOException("FFprobe exited abnormally");
385 }
386 } catch (InterruptedException e) {
387 throw new IOException(e);
388 } finally {
389 IoSupport.closeQuietly(process);
390 }
391
392 return gson.fromJson(output, JsonFFprobe.class);
393 }
394
395
396
397
398
399
400
401
402
403 private float overlap(Date aStart, Date aEnd, Date bStart, Date bEnd) {
404 var min = Math.min(aStart.getTime(), bStart.getTime());
405 var max = Math.max(aEnd.getTime(), bEnd.getTime());
406 var aLen = aEnd.getTime() - aStart.getTime();
407 var bLen = bEnd.getTime() - bStart.getTime();
408 var overlap = aLen + bLen - (max - min);
409 logger.debug("Detected overlap of {} ({})", overlap, overlap / (float) aLen);
410 if (aLen == 0F) {
411 return 1F;
412 }
413 if (overlap > 0F) {
414 return overlap / (float) aLen;
415 }
416 return 0.0F;
417 }
418 }
419
420 @Override
421 public void run() {
422 while (true) {
423 try {
424 final Future<RetriableIngestJob> f = completionService.take();
425 final RetriableIngestJob task = f.get();
426 if (task.hasFailed()) {
427 if (task.getRetryCount() < maxTries) {
428 throttle.acquire();
429 logger.warn("Retrying inbox ingest of {}", task.getArtifact().getAbsolutePath());
430 completionService.submit(task);
431 } else {
432 logger.error("Inbox ingest failed after {} tries for {}", maxTries, task.getArtifact().getAbsolutePath());
433 }
434 }
435 } catch (InterruptedException e) {
436 logger.debug("Ingestor check interrupted", e);
437 return;
438 } catch (ExecutionException e) {
439 logger.error("Ingestor check interrupted", e);
440 }
441 }
442 }
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461 public Ingestor(IngestService ingestService, SecurityContext secCtx,
462 String workflowDefinition, Map<String, String> workflowConfig, String mediaFlavor, File inbox, int maxThreads,
463 SeriesService seriesService, int maxTries, int secondsBetweenTries, Optional<Pattern> metadataPattern,
464 DateTimeFormatter dateFormatter, SchedulerService schedulerService, String ffprobe, boolean matchSchedule,
465 float matchThreshold, Workspace workspace) {
466 this.ingestService = ingestService;
467 this.secCtx = secCtx;
468 this.workflowDefinition = workflowDefinition;
469 this.workflowConfig = workflowConfig;
470 this.mediaFlavor = MediaPackageElementFlavor.parseFlavor(mediaFlavor);
471 this.inbox = inbox;
472 this.executorService = Executors.newFixedThreadPool(maxThreads);
473 this.completionService = new ExecutorCompletionService<>(executorService);
474 this.seriesService = seriesService;
475 this.maxTries = maxTries;
476 this.secondsBetweenTries = secondsBetweenTries;
477 this.metadataPattern = metadataPattern;
478 this.dateFormatter = dateFormatter;
479 this.schedulerService = schedulerService;
480 this.ffprobe = ffprobe;
481 this.matchSchedule = matchSchedule;
482 this.matchThreshold = matchThreshold;
483 this.workspace = workspace;
484 }
485
486
487
488
489 public void ingest(final File artifact) {
490 logger.info("Try ingest of file {}", artifact.getName());
491 completionService.submit(new RetriableIngestJob(artifact, secondsBetweenTries));
492 }
493
494
495
496
497
498 public boolean canHandle(final File artifact) {
499 logger.trace("canHandle() {}, {}", myInfo(), artifact.getAbsolutePath());
500 File dir = artifact.getParentFile();
501 try {
502
503 return dir != null && !artifact.getName().startsWith(".")
504 && FileUtils.directoryContains(inbox, artifact)
505 && artifact.canRead() && artifact.length() > 0;
506 } catch (IOException e) {
507 logger.warn("Unable to determine canonical path of {}", artifact.getAbsolutePath(), e);
508 return false;
509 }
510 }
511
512 public void cleanup(final File artifact) {
513 try {
514 File parentDir = artifact.getParentFile();
515 if (FileUtils.directoryContains(inbox, parentDir)) {
516 String[] filesList = parentDir.list();
517 if (filesList == null || filesList.length == 0) {
518 logger.info("Delete empty inbox for series {}",
519 StringUtils.substring(parentDir.getCanonicalPath(), inbox.getCanonicalPath().length() + 1));
520 FileUtils.deleteDirectory(parentDir);
521 }
522 }
523 } catch (Exception e) {
524 logger.error("Unable to cleanup inbox for the artifact {}", artifact, e);
525 }
526 }
527
528
529
530
531
532
533
534
535
536
537
538
539
540 private MediaPackage updateDublincCoreCatalog(MediaPackage mp, DublinCoreCatalog dc)
541 throws IOException, MediaPackageException {
542 try (InputStream inputStream = IOUtils.toInputStream(dc.toXmlString(), "UTF-8")) {
543
544 Catalog[] catalogs = mp.getCatalogs(MediaPackageElements.EPISODE);
545 if (catalogs.length > 0) {
546 Catalog catalog = catalogs[0];
547 URI uri = workspace.put(mp.getIdentifier().toString(), catalog.getIdentifier(), "dublincore.xml", inputStream);
548 catalog.setURI(uri);
549
550 catalog.setChecksum(null);
551 } else {
552 throw new MediaPackageException("Unable to find catalog");
553 }
554 }
555 return mp;
556 }
557
558 public String myInfo() {
559 return format("[%x thread=%x]", hashCode(), Thread.currentThread().getId());
560 }
561
562 class JsonFFprobe {
563 protected JsonFormat format;
564 }
565
566 class JsonFormat {
567 private String duration;
568 protected JsonTags tags;
569
570 Float getDuration() {
571 return duration == null ? null : Float.parseFloat(duration);
572 }
573
574 @Override
575 public String toString() {
576 return String.format("{duration=%s,tags=%s}", duration, tags);
577 }
578 }
579
580 class JsonTags {
581 @SerializedName(value = "creation_time")
582 private String creationTime;
583
584 Date getCreationTime() throws ParseException {
585 if (creationTime == null) {
586 return null;
587 }
588 DateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSz");
589 return format.parse(creationTime.replaceAll("000Z$", "+0000"));
590 }
591
592 @Override
593 public String toString() {
594 return String.format("{creation_time=%s}", creationTime);
595 }
596 }
597 }