1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.ingest.scanner;
23
24 import static java.lang.String.format;
25 import static org.opencastproject.scheduler.api.RecordingState.UPLOAD_FINISHED;
26
27 import org.opencastproject.ingest.api.IngestService;
28 import org.opencastproject.mediapackage.Catalog;
29 import org.opencastproject.mediapackage.MediaPackage;
30 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
31 import org.opencastproject.mediapackage.MediaPackageElements;
32 import org.opencastproject.mediapackage.MediaPackageException;
33 import org.opencastproject.mediapackage.Track;
34 import org.opencastproject.mediapackage.identifier.IdImpl;
35 import org.opencastproject.metadata.dublincore.DublinCore;
36 import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
37 import org.opencastproject.metadata.dublincore.DublinCoreUtil;
38 import org.opencastproject.metadata.dublincore.DublinCores;
39 import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
40 import org.opencastproject.metadata.dublincore.Precision;
41 import org.opencastproject.scheduler.api.Recording;
42 import org.opencastproject.scheduler.api.SchedulerService;
43 import org.opencastproject.security.util.SecurityContext;
44 import org.opencastproject.series.api.SeriesService;
45 import org.opencastproject.util.IoSupport;
46 import org.opencastproject.util.NotFoundException;
47 import org.opencastproject.workflow.api.WorkflowInstance;
48 import org.opencastproject.workspace.api.Workspace;
49
50 import com.google.common.util.concurrent.RateLimiter;
51 import com.google.gson.Gson;
52 import com.google.gson.annotations.SerializedName;
53
54 import org.apache.commons.io.FileUtils;
55 import org.apache.commons.io.FilenameUtils;
56 import org.apache.commons.io.IOUtils;
57 import org.apache.commons.lang3.StringUtils;
58 import org.apache.commons.lang3.time.DateUtils;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 import java.io.ByteArrayInputStream;
63 import java.io.ByteArrayOutputStream;
64 import java.io.File;
65 import java.io.FileInputStream;
66 import java.io.IOException;
67 import java.io.InputStream;
68 import java.net.URI;
69 import java.nio.charset.StandardCharsets;
70 import java.sql.Timestamp;
71 import java.text.DateFormat;
72 import java.text.ParseException;
73 import java.text.SimpleDateFormat;
74 import java.time.Instant;
75 import java.time.LocalDateTime;
76 import java.time.format.DateTimeFormatter;
77 import java.time.format.DateTimeParseException;
78 import java.util.ArrayList;
79 import java.util.Date;
80 import java.util.Map;
81 import java.util.Optional;
82 import java.util.concurrent.Callable;
83 import java.util.concurrent.CompletionService;
84 import java.util.concurrent.ExecutionException;
85 import java.util.concurrent.ExecutorCompletionService;
86 import java.util.concurrent.ExecutorService;
87 import java.util.concurrent.Executors;
88 import java.util.concurrent.Future;
89 import java.util.regex.Pattern;
90 import java.util.stream.Collectors;
91
92
93 public class Ingestor implements Runnable {
94
95
96
97
98 private static final Logger logger = LoggerFactory.getLogger(Ingestor.class);
99
100 private final IngestService ingestService;
101
102 private final SecurityContext secCtx;
103
104 private final String workflowDefinition;
105
106 private final Map<String, String> workflowConfig;
107
108 private final MediaPackageElementFlavor mediaFlavor;
109
110 private final File inbox;
111
112 private final SeriesService seriesService;
113 private final SchedulerService schedulerService;
114 private final Workspace workspace;
115
116 private final int maxTries;
117
118 private final int secondsBetweenTries;
119
120 private RateLimiter throttle = RateLimiter.create(1.0);
121
122 private final Optional<Pattern> metadataPattern;
123 private final DateTimeFormatter dateFormatter;
124 private final String ffprobe;
125
126 private final Gson gson = new Gson();
127
128 private final boolean matchSchedule;
129 private final float matchThreshold;
130
131
132
133
134 private final ExecutorService executorService;
135
136
137
138
139 private final CompletionService<RetriableIngestJob> completionService;
140
141
142 private class RetriableIngestJob implements Callable<RetriableIngestJob> {
143 private final File artifact;
144 private int retryCount;
145 private boolean failed;
146 private RateLimiter throttle;
147
148 RetriableIngestJob(final File artifact, int secondsBetweenTries) {
149 this.artifact = artifact;
150 this.retryCount = 0;
151 this.failed = false;
152 throttle = RateLimiter.create(1.0 / secondsBetweenTries);
153 }
154
155 public boolean hasFailed() {
156 return this.failed;
157 }
158
159 public int getRetryCount() {
160 return this.retryCount;
161 }
162
163 public File getArtifact() {
164 return this.artifact;
165 }
166
167 @Override
168 public RetriableIngestJob call() {
169 return secCtx.runInContext(() -> {
170 if (hasFailed()) {
171 logger.warn("This is retry number {} for file {}. We will wait for {} seconds before trying again",
172 retryCount, artifact.getName(), secondsBetweenTries);
173 throttle.acquire();
174 }
175 try (InputStream in = new FileInputStream(artifact)) {
176 failed = false;
177 ++retryCount;
178 if ("zip".equalsIgnoreCase(FilenameUtils.getExtension(artifact.getName()))) {
179 logger.info("Start ingest inbox file {} as a zipped mediapackage", artifact.getName());
180 WorkflowInstance workflowInstance = ingestService.addZippedMediaPackage(in, workflowDefinition,
181 workflowConfig);
182 logger.info("Ingested {} as a zipped mediapackage from inbox as {}. Started workflow {}.",
183 artifact.getName(), workflowInstance.getMediaPackage().getIdentifier().toString(),
184 workflowInstance.getId());
185 } else {
186
187 logger.info("Start ingest track from file {}", artifact.getName());
188
189
190 String title = artifact.getName();
191 String spatial = null;
192 Date created = null;
193 Float duration = null;
194 if (metadataPattern.isPresent()) {
195 var matcher = metadataPattern.get().matcher(artifact.getName());
196 if (matcher.find()) {
197 try {
198 title = matcher.group("title");
199 } catch (IllegalArgumentException e) {
200 logger.debug("{} matches no 'title' in {}", metadataPattern.get(), artifact.getName(), e);
201 }
202 try {
203 spatial = matcher.group("spatial");
204 } catch (IllegalArgumentException e) {
205 logger.debug("{} matches no 'spatial' in {}", metadataPattern.get(), artifact.getName(), e);
206 }
207 try {
208 var value = matcher.group("created");
209 logger.debug("Trying to parse matched date '{}' with formatter {}", value, dateFormatter);
210 created = Timestamp.valueOf(LocalDateTime.parse(value, dateFormatter));
211 } catch (DateTimeParseException e) {
212 logger.warn("Matched date does not match configured date-time format", e);
213 } catch (IllegalArgumentException e) {
214 logger.debug("{} matches no 'created' in {}", metadataPattern.get(), artifact.getName(), e);
215 }
216 } else {
217 logger.debug("Regular expression {} does not match {}", metadataPattern.get(), artifact.getName());
218 }
219 }
220
221
222 if (ffprobe != null) {
223 JsonFormat json = probeMedia(artifact.getAbsolutePath()).format;
224 created = json.tags.getCreationTime() == null ? created : json.tags.getCreationTime();
225 duration = json.getDuration();
226 logger.debug("Extracted metadata from file: {}", json);
227 }
228
229 MediaPackage mediaPackage = null;
230 var currentWorkflowDefinition = workflowDefinition;
231 var currentWorkflowConfig = workflowConfig;
232
233
234 if (matchSchedule && spatial != null && created != null) {
235 logger.debug("Try finding scheduled event for agent {} at time {}", spatial, created);
236 var end = duration == null ? created : DateUtils.addSeconds(created, duration.intValue());
237 var mediaPackages = schedulerService.findConflictingEvents(spatial, created, end);
238 if (matchThreshold > 0F && mediaPackages.size() > 1) {
239 var filteredMediaPackages = new ArrayList<MediaPackage>();
240 for (var mp : mediaPackages) {
241 var schedule = schedulerService.getTechnicalMetadata(mp.getIdentifier().toString());
242 if (overlap(schedule.getStartDate(), schedule.getEndDate(), created, end) > matchThreshold) {
243 filteredMediaPackages.add(mp);
244 }
245 }
246 mediaPackages = filteredMediaPackages;
247 }
248 if (mediaPackages.size() > 1) {
249 logger.warn("Metadata match multiple events. Not using any!");
250 } else if (mediaPackages.size() == 1) {
251 mediaPackage = mediaPackages.get(0);
252 var id = mediaPackage.getIdentifier().toString();
253 var eventConfiguration = schedulerService.getCaptureAgentConfiguration(id);
254
255
256
257 try {
258 Recording recordingState = schedulerService.getRecordingState(id);
259 if (recordingState.getState().equals(UPLOAD_FINISHED)) {
260 var referenceId = mediaPackage.getIdentifier().toString();
261 mediaPackage = (MediaPackage) mediaPackage.clone();
262 mediaPackage.setIdentifier(IdImpl.fromUUID());
263
264
265 for (Track track : mediaPackage.getTracks()) {
266 logger.info("Remove track: " + track);
267 mediaPackage.remove(track);
268 }
269
270
271 try {
272 DublinCoreCatalog dc = DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage).get();
273 var newTitle = dc.get(DublinCore.PROPERTY_TITLE).get(0).getValue()
274 + " (" + Instant.now().getEpochSecond() + ")";
275 dc.set(DublinCore.PROPERTY_TITLE, newTitle);
276 dc.set(DublinCore.PROPERTY_REFERENCES, referenceId);
277 mediaPackage = updateDublincCoreCatalog(mediaPackage, dc);
278 mediaPackage.setTitle(newTitle);
279 } catch (Exception e) {
280
281 }
282 }
283 } catch (NotFoundException e) {
284
285 }
286
287 currentWorkflowDefinition = eventConfiguration.getOrDefault(
288 "org.opencastproject.workflow.definition",
289 workflowDefinition);
290 currentWorkflowConfig = eventConfiguration.entrySet().stream()
291 .filter(e -> e.getKey().startsWith("org.opencastproject.workflow.config."))
292 .collect(Collectors.toMap(e -> e.getKey().substring(36), Map.Entry::getValue));
293 schedulerService.updateRecordingState(id, UPLOAD_FINISHED);
294 logger.info("Found matching scheduled event {}", mediaPackage);
295 } else {
296 logger.debug("No matching event found.");
297 }
298 }
299
300
301 if (mediaPackage == null) {
302
303 mediaPackage = ingestService.createMediaPackage();
304
305 DublinCoreCatalog dcc = DublinCores.mkOpencastEpisode().getCatalog();
306 if (spatial != null) {
307 dcc.add(DublinCore.PROPERTY_SPATIAL, spatial);
308 }
309 if (created != null) {
310 dcc.add(DublinCore.PROPERTY_CREATED, EncodingSchemeUtils.encodeDate(created, Precision.Second));
311 }
312
313 dcc.add(DublinCore.PROPERTY_TITLE, title);
314
315
316 final File dir = artifact.getParentFile();
317 if (FileUtils.directoryContains(inbox, dir)) {
318
319 var seriesID = dir.getName();
320 if (seriesService.getSeries(seriesID) != null) {
321 logger.info("Ingest from inbox into series with id {}", seriesID);
322 dcc.add(DublinCore.PROPERTY_IS_PART_OF, seriesID);
323 }
324 }
325
326 try (ByteArrayOutputStream dcout = new ByteArrayOutputStream()) {
327 dcc.toXml(dcout, true);
328 try (InputStream dcin = new ByteArrayInputStream(dcout.toByteArray())) {
329 mediaPackage = ingestService.addCatalog(dcin, "dublincore.xml", MediaPackageElements.EPISODE,
330 mediaPackage);
331 logger.info("Added DC catalog to media package for ingest from inbox");
332 }
333 }
334 }
335
336
337 mediaPackage = ingestService.addTrack(in, artifact.getName(), mediaFlavor, mediaPackage);
338 logger.info("Ingested track from file {} to media package {}",
339 artifact.getName(), mediaPackage.getIdentifier().toString());
340
341
342 WorkflowInstance workflowInstance = ingestService.ingest(mediaPackage, currentWorkflowDefinition,
343 currentWorkflowConfig);
344 logger.info("Ingested {} from inbox, workflow {} started", artifact.getName(), workflowInstance.getId());
345 }
346 } catch (Exception e) {
347 logger.error("Error ingesting inbox file {}", artifact.getName(), e);
348 failed = true;
349 return RetriableIngestJob.this;
350 }
351 try {
352 FileUtils.forceDelete(artifact);
353 } catch (IOException e) {
354 logger.error("Unable to delete file {}", artifact.getAbsolutePath(), e);
355 }
356 return RetriableIngestJob.this;
357 });
358 }
359
360 private JsonFFprobe probeMedia(final String file) throws IOException {
361
362 final String[] command = new String[] {
363 ffprobe,
364 "-show_format",
365 "-of",
366 "json",
367 file
368 };
369
370
371 logger.debug("Running ffprobe: {}", (Object) command);
372
373 String output;
374 Process process = null;
375 try {
376 process = new ProcessBuilder(command)
377 .redirectError(ProcessBuilder.Redirect.DISCARD)
378 .start();
379
380 try (InputStream in = process.getInputStream()) {
381 output = IOUtils.toString(in, StandardCharsets.UTF_8);
382 }
383
384 if (process.waitFor() != 0) {
385 throw new IOException("FFprobe exited abnormally");
386 }
387 } catch (InterruptedException e) {
388 throw new IOException(e);
389 } finally {
390 IoSupport.closeQuietly(process);
391 }
392
393 return gson.fromJson(output, JsonFFprobe.class);
394 }
395
396
397
398
399
400
401
402
403
404 private float overlap(Date aStart, Date aEnd, Date bStart, Date bEnd) {
405 var min = Math.min(aStart.getTime(), bStart.getTime());
406 var max = Math.max(aEnd.getTime(), bEnd.getTime());
407 var aLen = aEnd.getTime() - aStart.getTime();
408 var bLen = bEnd.getTime() - bStart.getTime();
409 var overlap = aLen + bLen - (max - min);
410 logger.debug("Detected overlap of {} ({})", overlap, overlap / (float) aLen);
411 if (aLen == 0F) {
412 return 1F;
413 }
414 if (overlap > 0F) {
415 return overlap / (float) aLen;
416 }
417 return 0.0F;
418 }
419 }
420
421 @Override
422 public void run() {
423 while (true) {
424 try {
425 final Future<RetriableIngestJob> f = completionService.take();
426 final RetriableIngestJob task = f.get();
427 if (task.hasFailed()) {
428 if (task.getRetryCount() < maxTries) {
429 throttle.acquire();
430 logger.warn("Retrying inbox ingest of {}", task.getArtifact().getAbsolutePath());
431 completionService.submit(task);
432 } else {
433 logger.error("Inbox ingest failed after {} tries for {}", maxTries, task.getArtifact().getAbsolutePath());
434 }
435 }
436 } catch (InterruptedException e) {
437 logger.debug("Ingestor check interrupted", e);
438 return;
439 } catch (ExecutionException e) {
440 logger.error("Ingestor check interrupted", e);
441 }
442 }
443 }
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462 public Ingestor(IngestService ingestService, SecurityContext secCtx,
463 String workflowDefinition, Map<String, String> workflowConfig, String mediaFlavor, File inbox, int maxThreads,
464 SeriesService seriesService, int maxTries, int secondsBetweenTries, Optional<Pattern> metadataPattern,
465 DateTimeFormatter dateFormatter, SchedulerService schedulerService, String ffprobe, boolean matchSchedule,
466 float matchThreshold, Workspace workspace) {
467 this.ingestService = ingestService;
468 this.secCtx = secCtx;
469 this.workflowDefinition = workflowDefinition;
470 this.workflowConfig = workflowConfig;
471 this.mediaFlavor = MediaPackageElementFlavor.parseFlavor(mediaFlavor);
472 this.inbox = inbox;
473 this.executorService = Executors.newFixedThreadPool(maxThreads);
474 this.completionService = new ExecutorCompletionService<>(executorService);
475 this.seriesService = seriesService;
476 this.maxTries = maxTries;
477 this.secondsBetweenTries = secondsBetweenTries;
478 this.metadataPattern = metadataPattern;
479 this.dateFormatter = dateFormatter;
480 this.schedulerService = schedulerService;
481 this.ffprobe = ffprobe;
482 this.matchSchedule = matchSchedule;
483 this.matchThreshold = matchThreshold;
484 this.workspace = workspace;
485 }
486
487
488
489
490 public void ingest(final File artifact) {
491 logger.info("Try ingest of file {}", artifact.getName());
492 completionService.submit(new RetriableIngestJob(artifact, secondsBetweenTries));
493 }
494
495
496
497
498
499 public boolean canHandle(final File artifact) {
500 logger.trace("canHandle() {}, {}", myInfo(), artifact.getAbsolutePath());
501 File dir = artifact.getParentFile();
502 try {
503
504 return dir != null && !artifact.getName().startsWith(".")
505 && FileUtils.directoryContains(inbox, artifact)
506 && artifact.canRead() && artifact.length() > 0;
507 } catch (IOException e) {
508 logger.warn("Unable to determine canonical path of {}", artifact.getAbsolutePath(), e);
509 return false;
510 }
511 }
512
513 public void cleanup(final File artifact) {
514 try {
515 File parentDir = artifact.getParentFile();
516 if (FileUtils.directoryContains(inbox, parentDir)) {
517 String[] filesList = parentDir.list();
518 if (filesList == null || filesList.length == 0) {
519 logger.info("Delete empty inbox for series {}",
520 StringUtils.substring(parentDir.getCanonicalPath(), inbox.getCanonicalPath().length() + 1));
521 FileUtils.deleteDirectory(parentDir);
522 }
523 }
524 } catch (Exception e) {
525 logger.error("Unable to cleanup inbox for the artifact {}", artifact, e);
526 }
527 }
528
529
530
531
532
533
534
535
536
537
538
539
540
541 private MediaPackage updateDublincCoreCatalog(MediaPackage mp, DublinCoreCatalog dc)
542 throws IOException, MediaPackageException {
543 try (InputStream inputStream = IOUtils.toInputStream(dc.toXmlString(), "UTF-8")) {
544
545 Catalog[] catalogs = mp.getCatalogs(MediaPackageElements.EPISODE);
546 if (catalogs.length > 0) {
547 Catalog catalog = catalogs[0];
548 URI uri = workspace.put(mp.getIdentifier().toString(), catalog.getIdentifier(), "dublincore.xml", inputStream);
549 catalog.setURI(uri);
550
551 catalog.setChecksum(null);
552 } else {
553 throw new MediaPackageException("Unable to find catalog");
554 }
555 }
556 return mp;
557 }
558
559 public String myInfo() {
560 return format("[%x thread=%x]", hashCode(), Thread.currentThread().getId());
561 }
562
563 class JsonFFprobe {
564 protected JsonFormat format;
565 }
566
567 class JsonFormat {
568 private String duration;
569 protected JsonTags tags;
570
571 Float getDuration() {
572 return duration == null ? null : Float.parseFloat(duration);
573 }
574
575 @Override
576 public String toString() {
577 return String.format("{duration=%s,tags=%s}", duration, tags);
578 }
579 }
580
581 class JsonTags {
582 @SerializedName(value = "creation_time")
583 private String creationTime;
584
585 Date getCreationTime() throws ParseException {
586 if (creationTime == null) {
587 return null;
588 }
589 DateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSz");
590 return format.parse(creationTime.replaceAll("000Z$", "+0000"));
591 }
592
593 @Override
594 public String toString() {
595 return String.format("{creation_time=%s}", creationTime);
596 }
597 }
598 }