Ingestor.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.ingest.scanner;
import static java.lang.String.format;
import static org.opencastproject.scheduler.api.RecordingState.UPLOAD_FINISHED;
import org.opencastproject.ingest.api.IngestService;
import org.opencastproject.mediapackage.Catalog;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.mediapackage.MediaPackageElementFlavor;
import org.opencastproject.mediapackage.MediaPackageElements;
import org.opencastproject.mediapackage.MediaPackageException;
import org.opencastproject.mediapackage.Track;
import org.opencastproject.mediapackage.identifier.IdImpl;
import org.opencastproject.metadata.dublincore.DublinCore;
import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
import org.opencastproject.metadata.dublincore.DublinCoreUtil;
import org.opencastproject.metadata.dublincore.DublinCores;
import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
import org.opencastproject.metadata.dublincore.Precision;
import org.opencastproject.scheduler.api.Recording;
import org.opencastproject.scheduler.api.SchedulerService;
import org.opencastproject.security.util.SecurityContext;
import org.opencastproject.series.api.SeriesService;
import org.opencastproject.util.IoSupport;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.workflow.api.WorkflowInstance;
import org.opencastproject.workspace.api.Workspace;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/** Used by the {@link InboxScannerService} to do the actual ingest. */
public class Ingestor implements Runnable {
/**
* The logger
*/
private static final Logger logger = LoggerFactory.getLogger(Ingestor.class);
private final IngestService ingestService;
private final SecurityContext secCtx;
private final String workflowDefinition;
private final Map<String, String> workflowConfig;
private final MediaPackageElementFlavor mediaFlavor;
private final File inbox;
private final SeriesService seriesService;
private final SchedulerService schedulerService;
private final Workspace workspace;
private final int maxTries;
private final int secondsBetweenTries;
private RateLimiter throttle = RateLimiter.create(1.0);
private final Optional<Pattern> metadataPattern;
private final DateTimeFormatter dateFormatter;
private final String ffprobe;
private final Gson gson = new Gson();
private final boolean matchSchedule;
private final float matchThreshold;
/**
* Thread pool to run the ingest worker.
*/
private final ExecutorService executorService;
/**
* Completion service to manage internal completion queue of ingest jobs including retries
*/
private final CompletionService<RetriableIngestJob> completionService;
private class RetriableIngestJob implements Callable<RetriableIngestJob> {
private final File artifact;
private int retryCount;
private boolean failed;
private RateLimiter throttle;
RetriableIngestJob(final File artifact, int secondsBetweenTries) {
this.artifact = artifact;
this.retryCount = 0;
this.failed = false;
throttle = RateLimiter.create(1.0 / secondsBetweenTries);
}
public boolean hasFailed() {
return this.failed;
}
public int getRetryCount() {
return this.retryCount;
}
public File getArtifact() {
return this.artifact;
}
@Override
public RetriableIngestJob call() {
return secCtx.runInContext(() -> {
if (hasFailed()) {
logger.warn("This is retry number {} for file {}. We will wait for {} seconds before trying again",
retryCount, artifact.getName(), secondsBetweenTries);
throttle.acquire();
}
try (InputStream in = new FileInputStream(artifact)) {
failed = false;
++retryCount;
if ("zip".equalsIgnoreCase(FilenameUtils.getExtension(artifact.getName()))) {
logger.info("Start ingest inbox file {} as a zipped mediapackage", artifact.getName());
WorkflowInstance workflowInstance = ingestService.addZippedMediaPackage(in, workflowDefinition, workflowConfig);
logger.info("Ingested {} as a zipped mediapackage from inbox as {}. Started workflow {}.",
artifact.getName(), workflowInstance.getMediaPackage().getIdentifier().toString(),
workflowInstance.getId());
} else {
/* Create MediaPackage and add Track */
logger.info("Start ingest track from file {}", artifact.getName());
// Try extracting metadata from the file name and path
String title = artifact.getName();
String spatial = null;
Date created = null;
Float duration = null;
if (metadataPattern.isPresent()) {
var matcher = metadataPattern.get().matcher(artifact.getName());
if (matcher.find()) {
try {
title = matcher.group("title");
} catch (IllegalArgumentException e) {
logger.debug("{} matches no 'title' in {}", metadataPattern.get(), artifact.getName(), e);
}
try {
spatial = matcher.group("spatial");
} catch (IllegalArgumentException e) {
logger.debug("{} matches no 'spatial' in {}", metadataPattern.get(), artifact.getName(), e);
}
try {
var value = matcher.group("created");
logger.debug("Trying to parse matched date '{}' with formatter {}", value, dateFormatter);
created = Timestamp.valueOf(LocalDateTime.parse(value, dateFormatter));
} catch (DateTimeParseException e) {
logger.warn("Matched date does not match configured date-time format", e);
} catch (IllegalArgumentException e) {
logger.debug("{} matches no 'created' in {}", metadataPattern.get(), artifact.getName(), e);
}
} else {
logger.debug("Regular expression {} does not match {}", metadataPattern.get(), artifact.getName());
}
}
// Try extracting additional metadata via ffprobe
if (ffprobe != null) {
JsonFormat json = probeMedia(artifact.getAbsolutePath()).format;
created = json.tags.getCreationTime() == null ? created : json.tags.getCreationTime();
duration = json.getDuration();
logger.debug("Extracted metadata from file: {}", json);
}
MediaPackage mediaPackage = null;
var currentWorkflowDefinition = workflowDefinition;
var currentWorkflowConfig = workflowConfig;
// Check if we can match this to a scheduled event
if (matchSchedule && spatial != null && created != null) {
logger.debug("Try finding scheduled event for agent {} at time {}", spatial, created);
var end = duration == null ? created : DateUtils.addSeconds(created, duration.intValue());
var mediaPackages = schedulerService.findConflictingEvents(spatial, created, end);
if (matchThreshold > 0F && mediaPackages.size() > 1) {
var filteredMediaPackages = new ArrayList<MediaPackage>();
for (var mp : mediaPackages) {
var schedule = schedulerService.getTechnicalMetadata(mp.getIdentifier().toString());
if (overlap(schedule.getStartDate(), schedule.getEndDate(), created, end) > matchThreshold) {
filteredMediaPackages.add(mp);
}
}
mediaPackages = filteredMediaPackages;
}
if (mediaPackages.size() > 1) {
logger.warn("Metadata match multiple events. Not using any!");
} else if (mediaPackages.size() == 1) {
mediaPackage = mediaPackages.get(0);
var id = mediaPackage.getIdentifier().toString();
var eventConfiguration = schedulerService.getCaptureAgentConfiguration(id);
// Check if the scheduled event already has a recording associated with it
// If so, ingest the file as a new event
try {
Recording recordingState = schedulerService.getRecordingState(id);
if (recordingState.getState().equals(UPLOAD_FINISHED)) {
var referenceId = mediaPackage.getIdentifier().toString();
mediaPackage = (MediaPackage) mediaPackage.clone();
mediaPackage.setIdentifier(IdImpl.fromUUID());
// Drop copied media files. We don't want them in the new event
for (Track track : mediaPackage.getTracks()) {
logger.info("Remove track: " + track);
mediaPackage.remove(track);
}
// Update dublincore title and set reference to originally scheduled event
try {
DublinCoreCatalog dc = DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage).get();
var newTitle = dc.get(DublinCore.PROPERTY_TITLE).get(0).getValue()
+ " (" + Instant.now().getEpochSecond() + ")";
dc.set(DublinCore.PROPERTY_TITLE, newTitle);
dc.set(DublinCore.PROPERTY_REFERENCES, referenceId);
mediaPackage = updateDublincCoreCatalog(mediaPackage, dc);
mediaPackage.setTitle(newTitle);
} catch (Exception e) {
// Don't fail the ingest if we could not set metadata for some reason
}
}
} catch (NotFoundException e) {
// Occurs if a scheduled event has not started yet
}
currentWorkflowDefinition = eventConfiguration.getOrDefault(
"org.opencastproject.workflow.definition",
workflowDefinition);
currentWorkflowConfig = eventConfiguration.entrySet().stream()
.filter(e -> e.getKey().startsWith("org.opencastproject.workflow.config."))
.collect(Collectors.toMap(e -> e.getKey().substring(36), Map.Entry::getValue));
schedulerService.updateRecordingState(id, UPLOAD_FINISHED);
logger.info("Found matching scheduled event {}", mediaPackage);
} else {
logger.debug("No matching event found.");
}
}
// create new media package and metadata catalog if we have none
if (mediaPackage == null) {
// create new media package
mediaPackage = ingestService.createMediaPackage();
DublinCoreCatalog dcc = DublinCores.mkOpencastEpisode().getCatalog();
if (spatial != null) {
dcc.add(DublinCore.PROPERTY_SPATIAL, spatial);
}
if (created != null) {
dcc.add(DublinCore.PROPERTY_CREATED, EncodingSchemeUtils.encodeDate(created, Precision.Second));
}
// fall back to filename for title if matcher did not catch any
dcc.add(DublinCore.PROPERTY_TITLE, title);
/* Check if we have a subdir and if its name matches an existing series */
final File dir = artifact.getParentFile();
if (FileUtils.directoryContains(inbox, dir)) {
/* cut away inbox path and trailing slash from artifact path */
var seriesID = dir.getName();
if (seriesService.getSeries(seriesID) != null) {
logger.info("Ingest from inbox into series with id {}", seriesID);
dcc.add(DublinCore.PROPERTY_IS_PART_OF, seriesID);
}
}
try (ByteArrayOutputStream dcout = new ByteArrayOutputStream()) {
dcc.toXml(dcout, true);
try (InputStream dcin = new ByteArrayInputStream(dcout.toByteArray())) {
mediaPackage = ingestService.addCatalog(dcin, "dublincore.xml", MediaPackageElements.EPISODE,
mediaPackage);
logger.info("Added DC catalog to media package for ingest from inbox");
}
}
}
// Ingest media
mediaPackage = ingestService.addTrack(in, artifact.getName(), mediaFlavor, mediaPackage);
logger.info("Ingested track from file {} to media package {}",
artifact.getName(), mediaPackage.getIdentifier().toString());
// Ingest media package
WorkflowInstance workflowInstance = ingestService.ingest(mediaPackage, currentWorkflowDefinition,
currentWorkflowConfig);
logger.info("Ingested {} from inbox, workflow {} started", artifact.getName(), workflowInstance.getId());
}
} catch (Exception e) {
logger.error("Error ingesting inbox file {}", artifact.getName(), e);
failed = true;
return RetriableIngestJob.this;
}
try {
FileUtils.forceDelete(artifact);
} catch (IOException e) {
logger.error("Unable to delete file {}", artifact.getAbsolutePath(), e);
}
return RetriableIngestJob.this;
});
}
private JsonFFprobe probeMedia(final String file) throws IOException {
final String[] command = new String[] {
ffprobe,
"-show_format",
"-of",
"json",
file
};
// Execute ffprobe and obtain the result
logger.debug("Running ffprobe: {}", (Object) command);
String output;
Process process = null;
try {
process = new ProcessBuilder(command)
.redirectError(ProcessBuilder.Redirect.DISCARD)
.start();
try (InputStream in = process.getInputStream()) {
output = IOUtils.toString(in, StandardCharsets.UTF_8);
}
if (process.waitFor() != 0) {
throw new IOException("FFprobe exited abnormally");
}
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
IoSupport.closeQuietly(process);
}
return gson.fromJson(output, JsonFFprobe.class);
}
/**
* Calculate the overlap of two events `a` and `b`.
* @param aStart Begin of event a
* @param aEnd End of event a
* @param bStart Begin of event b
* @param bEnd End of event b
* @return How much of `a` overlaps with `n`. Return a float in the range of <pre>[0.0, 1.0]</pre>.
*/
private float overlap(Date aStart, Date aEnd, Date bStart, Date bEnd) {
var min = Math.min(aStart.getTime(), bStart.getTime());
var max = Math.max(aEnd.getTime(), bEnd.getTime());
var aLen = aEnd.getTime() - aStart.getTime();
var bLen = bEnd.getTime() - bStart.getTime();
var overlap = aLen + bLen - (max - min);
logger.debug("Detected overlap of {} ({})", overlap, overlap / (float) aLen);
if (aLen == 0F) {
return 1F;
}
if (overlap > 0F) {
return overlap / (float) aLen;
}
return 0.0F;
}
}
@Override
public void run() {
while (true) {
try {
final Future<RetriableIngestJob> f = completionService.take();
final RetriableIngestJob task = f.get();
if (task.hasFailed()) {
if (task.getRetryCount() < maxTries) {
throttle.acquire();
logger.warn("Retrying inbox ingest of {}", task.getArtifact().getAbsolutePath());
completionService.submit(task);
} else {
logger.error("Inbox ingest failed after {} tries for {}", maxTries, task.getArtifact().getAbsolutePath());
}
}
} catch (InterruptedException e) {
logger.debug("Ingestor check interrupted", e);
return;
} catch (ExecutionException e) {
logger.error("Ingestor check interrupted", e);
}
}
}
/**
* Create new ingestor.
*
* @param ingestService media packages are passed to the ingest service
* @param secCtx security context needed for ingesting with the IngestService or for putting files into the working file
* repository
* @param workflowDefinition workflow to apply to ingested media packages
* @param workflowConfig the workflow definition configuration
* @param mediaFlavor media flavor to use by default
* @param inbox inbox directory to watch
* @param maxThreads maximum worker threads doing the actual ingest
* @param seriesService reference to the active series service
* @param maxTries maximum tries for a ingest job
* @param secondsBetweenTries time between retires in seconds
* @param metadataPattern regular expression pattern for matching metadata in file names
* @param dateFormatter date formatter pattern for parsing temporal metadata
*/
public Ingestor(IngestService ingestService, SecurityContext secCtx,
String workflowDefinition, Map<String, String> workflowConfig, String mediaFlavor, File inbox, int maxThreads,
SeriesService seriesService, int maxTries, int secondsBetweenTries, Optional<Pattern> metadataPattern,
DateTimeFormatter dateFormatter, SchedulerService schedulerService, String ffprobe, boolean matchSchedule,
float matchThreshold, Workspace workspace) {
this.ingestService = ingestService;
this.secCtx = secCtx;
this.workflowDefinition = workflowDefinition;
this.workflowConfig = workflowConfig;
this.mediaFlavor = MediaPackageElementFlavor.parseFlavor(mediaFlavor);
this.inbox = inbox;
this.executorService = Executors.newFixedThreadPool(maxThreads);
this.completionService = new ExecutorCompletionService<>(executorService);
this.seriesService = seriesService;
this.maxTries = maxTries;
this.secondsBetweenTries = secondsBetweenTries;
this.metadataPattern = metadataPattern;
this.dateFormatter = dateFormatter;
this.schedulerService = schedulerService;
this.ffprobe = ffprobe;
this.matchSchedule = matchSchedule;
this.matchThreshold = matchThreshold;
this.workspace = workspace;
}
/**
* Asynchronous ingest of an artifact.
*/
public void ingest(final File artifact) {
logger.info("Try ingest of file {}", artifact.getName());
completionService.submit(new RetriableIngestJob(artifact, secondsBetweenTries));
}
/**
* Return true if the passed artifact can be handled by this ingestor,
* false if not (e.g. it lies outside of inbox or its name starts with a ".")
*/
public boolean canHandle(final File artifact) {
logger.trace("canHandle() {}, {}", myInfo(), artifact.getAbsolutePath());
File dir = artifact.getParentFile();
try {
/* Stop if dir is empty, stop if artifact is dotfile, stop if artifact lives outside of inbox path */
return dir != null && !artifact.getName().startsWith(".")
&& FileUtils.directoryContains(inbox, artifact)
&& artifact.canRead() && artifact.length() > 0;
} catch (IOException e) {
logger.warn("Unable to determine canonical path of {}", artifact.getAbsolutePath(), e);
return false;
}
}
public void cleanup(final File artifact) {
try {
File parentDir = artifact.getParentFile();
if (FileUtils.directoryContains(inbox, parentDir)) {
String[] filesList = parentDir.list();
if (filesList == null || filesList.length == 0) {
logger.info("Delete empty inbox for series {}",
StringUtils.substring(parentDir.getCanonicalPath(), inbox.getCanonicalPath().length() + 1));
FileUtils.deleteDirectory(parentDir);
}
}
} catch (Exception e) {
logger.error("Unable to cleanup inbox for the artifact {}", artifact, e);
}
}
/**
*
* @param mp
* the mediapackage to update
* @param dc
* the dublincore metadata to use to update the mediapackage
* @return the updated mediapackage
* @throws IOException
* Thrown if an IO error occurred adding the dc catalog file
* @throws MediaPackageException
* Thrown if an error occurred updating the mediapackage or the mediapackage does not contain a catalog
*/
private MediaPackage updateDublincCoreCatalog(MediaPackage mp, DublinCoreCatalog dc)
throws IOException, MediaPackageException {
try (InputStream inputStream = IOUtils.toInputStream(dc.toXmlString(), "UTF-8")) {
// Update dublincore catalog
Catalog[] catalogs = mp.getCatalogs(MediaPackageElements.EPISODE);
if (catalogs.length > 0) {
Catalog catalog = catalogs[0];
URI uri = workspace.put(mp.getIdentifier().toString(), catalog.getIdentifier(), "dublincore.xml", inputStream);
catalog.setURI(uri);
// setting the URI to a new source so the checksum will most like be invalid
catalog.setChecksum(null);
} else {
throw new MediaPackageException("Unable to find catalog");
}
}
return mp;
}
public String myInfo() {
return format("[%x thread=%x]", hashCode(), Thread.currentThread().getId());
}
class JsonFFprobe {
protected JsonFormat format;
}
class JsonFormat {
private String duration;
protected JsonTags tags;
Float getDuration() {
return duration == null ? null : Float.parseFloat(duration);
}
@Override
public String toString() {
return String.format("{duration=%s,tags=%s}", duration, tags);
}
}
class JsonTags {
@SerializedName(value = "creation_time")
private String creationTime;
Date getCreationTime() throws ParseException {
if (creationTime == null) {
return null;
}
DateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSz");
return format.parse(creationTime.replaceAll("000Z$", "+0000"));
}
@Override
public String toString() {
return String.format("{creation_time=%s}", creationTime);
}
}
}