View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.ingest.impl;
23  
24  import static org.apache.commons.lang3.StringUtils.isBlank;
25  import static org.opencastproject.metadata.dublincore.DublinCore.PROPERTY_IDENTIFIER;
26  import static org.opencastproject.metadata.dublincore.DublinCore.PROPERTY_TITLE;
27  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
28  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_CAPTURE_AGENT_ROLE;
29  import static org.opencastproject.util.JobUtil.waitForJob;
30  
31  import org.opencastproject.authorization.xacml.XACMLParsingException;
32  import org.opencastproject.authorization.xacml.XACMLUtils;
33  import org.opencastproject.capture.CaptureParameters;
34  import org.opencastproject.ingest.api.IngestException;
35  import org.opencastproject.ingest.api.IngestService;
36  import org.opencastproject.ingest.impl.jmx.IngestStatistics;
37  import org.opencastproject.inspection.api.MediaInspectionService;
38  import org.opencastproject.job.api.AbstractJobProducer;
39  import org.opencastproject.job.api.Job;
40  import org.opencastproject.job.api.Job.Status;
41  import org.opencastproject.mediapackage.Attachment;
42  import org.opencastproject.mediapackage.Catalog;
43  import org.opencastproject.mediapackage.EName;
44  import org.opencastproject.mediapackage.MediaPackage;
45  import org.opencastproject.mediapackage.MediaPackageBuilderFactory;
46  import org.opencastproject.mediapackage.MediaPackageElement;
47  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
48  import org.opencastproject.mediapackage.MediaPackageElementParser;
49  import org.opencastproject.mediapackage.MediaPackageElements;
50  import org.opencastproject.mediapackage.MediaPackageException;
51  import org.opencastproject.mediapackage.MediaPackageParser;
52  import org.opencastproject.mediapackage.MediaPackageSupport;
53  import org.opencastproject.mediapackage.Track;
54  import org.opencastproject.mediapackage.identifier.IdImpl;
55  import org.opencastproject.metadata.dublincore.DCMIPeriod;
56  import org.opencastproject.metadata.dublincore.DublinCore;
57  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
58  import org.opencastproject.metadata.dublincore.DublinCoreCatalogService;
59  import org.opencastproject.metadata.dublincore.DublinCoreValue;
60  import org.opencastproject.metadata.dublincore.DublinCores;
61  import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
62  import org.opencastproject.metadata.dublincore.Precision;
63  import org.opencastproject.scheduler.api.SchedulerException;
64  import org.opencastproject.scheduler.api.SchedulerService;
65  import org.opencastproject.security.api.AccessControlEntry;
66  import org.opencastproject.security.api.AccessControlList;
67  import org.opencastproject.security.api.OrganizationDirectoryService;
68  import org.opencastproject.security.api.Permissions;
69  import org.opencastproject.security.api.SecurityService;
70  import org.opencastproject.security.api.TrustedHttpClient;
71  import org.opencastproject.security.api.UnauthorizedException;
72  import org.opencastproject.security.api.User;
73  import org.opencastproject.security.api.UserDirectoryService;
74  import org.opencastproject.security.util.SecurityUtil;
75  import org.opencastproject.series.api.SeriesException;
76  import org.opencastproject.series.api.SeriesService;
77  import org.opencastproject.serviceregistry.api.ServiceRegistry;
78  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
79  import org.opencastproject.smil.api.util.SmilUtil;
80  import org.opencastproject.userdirectory.UserIdRoleProvider;
81  import org.opencastproject.util.ConfigurationException;
82  import org.opencastproject.util.IoSupport;
83  import org.opencastproject.util.LoadUtil;
84  import org.opencastproject.util.MimeTypes;
85  import org.opencastproject.util.NotFoundException;
86  import org.opencastproject.util.ProgressInputStream;
87  import org.opencastproject.util.XmlSafeParser;
88  import org.opencastproject.util.XmlUtil;
89  import org.opencastproject.util.data.functions.Misc;
90  import org.opencastproject.util.jmx.JmxUtil;
91  import org.opencastproject.workflow.api.WorkflowDatabaseException;
92  import org.opencastproject.workflow.api.WorkflowDefinition;
93  import org.opencastproject.workflow.api.WorkflowException;
94  import org.opencastproject.workflow.api.WorkflowInstance;
95  import org.opencastproject.workflow.api.WorkflowService;
96  import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
97  
98  import com.google.common.cache.Cache;
99  import com.google.common.cache.CacheBuilder;
100 
101 import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
102 import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
103 import org.apache.commons.io.FilenameUtils;
104 import org.apache.commons.io.IOUtils;
105 import org.apache.commons.lang3.BooleanUtils;
106 import org.apache.commons.lang3.StringUtils;
107 import org.apache.cxf.jaxrs.ext.multipart.ContentDisposition;
108 import org.apache.http.Header;
109 import org.apache.http.HttpHeaders;
110 import org.apache.http.HttpResponse;
111 import org.apache.http.auth.AuthScope;
112 import org.apache.http.auth.UsernamePasswordCredentials;
113 import org.apache.http.client.CredentialsProvider;
114 import org.apache.http.client.config.AuthSchemes;
115 import org.apache.http.client.methods.HttpGet;
116 import org.apache.http.impl.client.BasicCredentialsProvider;
117 import org.apache.http.impl.client.CloseableHttpClient;
118 import org.apache.http.impl.client.HttpClientBuilder;
119 import org.osgi.service.cm.ManagedService;
120 import org.osgi.service.component.ComponentContext;
121 import org.osgi.service.component.annotations.Activate;
122 import org.osgi.service.component.annotations.Component;
123 import org.osgi.service.component.annotations.Deactivate;
124 import org.osgi.service.component.annotations.Reference;
125 import org.osgi.service.component.annotations.ReferenceCardinality;
126 import org.osgi.service.component.annotations.ReferencePolicy;
127 import org.slf4j.Logger;
128 import org.slf4j.LoggerFactory;
129 import org.xml.sax.SAXException;
130 
131 import java.beans.PropertyChangeEvent;
132 import java.beans.PropertyChangeListener;
133 import java.io.BufferedReader;
134 import java.io.IOException;
135 import java.io.InputStream;
136 import java.io.InputStreamReader;
137 import java.net.URI;
138 import java.nio.charset.StandardCharsets;
139 import java.util.ArrayList;
140 import java.util.Arrays;
141 import java.util.Base64;
142 import java.util.Date;
143 import java.util.Dictionary;
144 import java.util.HashMap;
145 import java.util.HashSet;
146 import java.util.List;
147 import java.util.Map;
148 import java.util.Map.Entry;
149 import java.util.Objects;
150 import java.util.Optional;
151 import java.util.Set;
152 import java.util.UUID;
153 import java.util.concurrent.TimeUnit;
154 import java.util.stream.Collectors;
155 
156 import javax.management.ObjectInstance;
157 
158 /**
159  * Creates and augments Opencast MediaPackages. Stores media into the Working File Repository.
160  */
161 @Component(
162     immediate = true,
163     service = {
164         IngestService.class,
165         ManagedService.class
166     },
167     property = {
168         "service.description=Ingest Service",
169         "service.pid=org.opencastproject.ingest.impl.IngestServiceImpl"
170     }
171 )
172 public class IngestServiceImpl extends AbstractJobProducer implements IngestService, ManagedService {
173 
174   /** The logger */
175   private static final Logger logger = LoggerFactory.getLogger(IngestServiceImpl.class);
176 
177   /** The source SMIL name */
178   private static final String PARTIAL_SMIL_NAME = "source_partial.smil";
179 
180   /** The configuration key that defines the default workflow definition */
181   protected static final String WORKFLOW_DEFINITION_DEFAULT = "org.opencastproject.workflow.default.definition";
182 
183   /** The workflow configuration property prefix **/
184   protected static final String WORKFLOW_CONFIGURATION_PREFIX = "org.opencastproject.workflow.config.";
185 
186   /** The key for the legacy mediapackage identifier */
187   public static final String LEGACY_MEDIAPACKAGE_ID_KEY = "org.opencastproject.ingest.legacy.mediapackage.id";
188 
189   public static final String JOB_TYPE = "org.opencastproject.ingest";
190 
191   /** Methods that ingest zips create jobs with this operation type */
192   public static final String INGEST_ZIP = "zip";
193 
194   /** Methods that ingest tracks directly create jobs with this operation type */
195   public static final String INGEST_TRACK = "track";
196 
197   /** Methods that ingest tracks from a URI create jobs with this operation type */
198   public static final String INGEST_TRACK_FROM_URI = "uri-track";
199 
200   /** Methods that ingest attachments directly create jobs with this operation type */
201   public static final String INGEST_ATTACHMENT = "attachment";
202 
203   /** Methods that ingest attachments from a URI create jobs with this operation type */
204   public static final String INGEST_ATTACHMENT_FROM_URI = "uri-attachment";
205 
206   /** Methods that ingest catalogs directly create jobs with this operation type */
207   public static final String INGEST_CATALOG = "catalog";
208 
209   /** Methods that ingest catalogs from a URI create jobs with this operation type */
210   public static final String INGEST_CATALOG_FROM_URI = "uri-catalog";
211 
212   /** The approximate load placed on the system by ingesting a file */
213   public static final float DEFAULT_INGEST_FILE_JOB_LOAD = 0.2f;
214 
215   /** The approximate load placed on the system by ingesting a zip file */
216   public static final float DEFAULT_INGEST_ZIP_JOB_LOAD = 0.2f;
217 
218   /** The key to look for in the service configuration file to override the {@link DEFAULT_INGEST_FILE_JOB_LOAD} */
219   public static final String FILE_JOB_LOAD_KEY = "job.load.ingest.file";
220 
221   /** The key to look for in the service configuration file to override the {@link DEFAULT_INGEST_ZIP_JOB_LOAD} */
222   public static final String ZIP_JOB_LOAD_KEY = "job.load.ingest.zip";
223 
224   /** The source to download from  */
225   public static final String DOWNLOAD_SOURCE = "org.opencastproject.download.source";
226 
227   /** The user for download from external sources */
228   public static final String DOWNLOAD_USER = "org.opencastproject.download.user";
229 
230   /** The password for download from external sources */
231   public static final String DOWNLOAD_PASSWORD = "org.opencastproject.download.password";
232 
233   /** The authentication method for download from external sources */
234   public static final String DOWNLOAD_AUTH_METHOD = "org.opencastproject.download.auth.method";
235 
236   /** Force basic authentication even if download host does not ask for it */
237   public static final String DOWNLOAD_AUTH_FORCE_BASIC = "org.opencastproject.download.auth.force_basic";
238 
239   /** By default, do not allow event ingest to modify existing series metadata */
240   public static final boolean DEFAULT_ALLOW_SERIES_MODIFICATIONS = false;
241 
242   /** The default is to preserve existing Opencast flavors during ingest. */
243   public static final boolean DEFAULT_ALLOW_ONLY_NEW_FLAVORS = true;
244 
245   /** The default is not to automatically skip attachments and catalogs from capture agent */
246   public static final boolean DEFAULT_SKIP = false;
247 
248   /** The default for force basic authentication even if download host does not ask for it */
249   public static final boolean DEFAULT_DOWNLOAD_AUTH_FORCE_BASIC = false;
250 
251   /** The maximum length of filenames ingested by Opencast */
252   public static final int FILENAME_LENGTH_MAX = 75;
253 
254   /** Managed Property key to allow Opencast series modification during ingest
255    * Deprecated, the param potentially causes an update chain reaction for all
256    * events associated to that series, for each ingest */
257   @Deprecated
258   public static final String MODIFY_OPENCAST_SERIES_KEY = "org.opencastproject.series.overwrite";
259 
260   /** Managed Property key to allow new flavors of ingested attachments and catalogs
261    * to be added to the existing Opencast mediapackage. But, not catalogs and attachments
262    * that would overwrite existing ones in Opencast.
263    */
264   public static final String ADD_ONLY_NEW_FLAVORS_KEY = "add.only.new.catalogs.attachments.for.existing.events";
265 
266   /** Control if catalogs sent by capture agents for scheduled events are skipped. */
267   public static final String SKIP_CATALOGS_KEY = "skip.catalogs.for.existing.events";
268 
269   /** Control if attachments sent by capture agents for scheduled events are skipped. */
270   public static final String SKIP_ATTACHMENTS_KEY = "skip.attachments.for.existing.events";
271 
272   /** The appendix added to autogenerated CA series. CA series creation deactivated if not configured */
273   private static final String SERIES_APPENDIX = "add.series.to.event.appendix";
274 
275   /** The approximate load placed on the system by ingesting a file */
276   private float ingestFileJobLoad = DEFAULT_INGEST_FILE_JOB_LOAD;
277 
278   /** The approximate load placed on the system by ingesting a zip file */
279   private float ingestZipJobLoad = DEFAULT_INGEST_ZIP_JOB_LOAD;
280 
281   /** The user for download from external sources */
282   private static String downloadUser = DOWNLOAD_USER;
283 
284   /** The password for download from external sources */
285   private static String downloadPassword = DOWNLOAD_PASSWORD;
286 
287   /** The authentication method for download from external sources */
288   private static String downloadAuthMethod = DOWNLOAD_AUTH_METHOD;
289 
290   /** Force basic authentication even if download host does not ask for it */
291   private static boolean downloadAuthForceBasic = DEFAULT_DOWNLOAD_AUTH_FORCE_BASIC;
292 
293   /** The external source dns name */
294   private static String downloadSource = DOWNLOAD_SOURCE;
295 
296   /** The JMX business object for ingest statistics */
297   private IngestStatistics ingestStatistics = new IngestStatistics();
298 
299   /** The JMX bean object instance */
300   private ObjectInstance registerMXBean;
301 
302   /** The workflow service */
303   private WorkflowService workflowService;
304 
305   /** The working file repository */
306   private WorkingFileRepository workingFileRepository;
307 
308   /** The http client */
309   private TrustedHttpClient httpClient;
310 
311   /** The series service */
312   private SeriesService seriesService;
313 
314   /** The dublin core service */
315   private DublinCoreCatalogService dublinCoreService;
316 
317   /** The opencast service registry */
318   private ServiceRegistry serviceRegistry;
319 
320   /** The security service */
321   protected SecurityService securityService = null;
322 
323   /** The user directory service */
324   protected UserDirectoryService userDirectoryService = null;
325 
326   /** The organization directory service */
327   protected OrganizationDirectoryService organizationDirectoryService = null;
328 
329   /** The scheduler service */
330   private SchedulerService schedulerService = null;
331 
332   /** The media inspection service */
333   private MediaInspectionService mediaInspectionService = null;
334 
335   /** The search index. */
336 
337   /** The default workflow identifier, if one is configured */
338   protected String defaultWorkflowDefinionId;
339 
340   /** The partial track start time map */
341   private Cache<String, Long> partialTrackStartTimes = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.DAYS)
342           .build();
343 
344   /** Option to overwrite matching flavors (e.g. series and episode metadata) on ingest,
345    *  tracks are always taken on ingest */
346   protected boolean isAddOnlyNew = DEFAULT_ALLOW_ONLY_NEW_FLAVORS;
347   protected boolean isAllowModifySeries = DEFAULT_ALLOW_SERIES_MODIFICATIONS;
348 
349   private boolean skipCatalogs = DEFAULT_SKIP;
350   private boolean skipAttachments = DEFAULT_SKIP;
351 
352   /** Create name appendix for autogenerated CA series */
353   private String createSeriesAppendix = null;
354 
355   protected boolean testMode = false;
356 
357   /**
358    * Creates a new ingest service instance.
359    */
360   public IngestServiceImpl() {
361     super(JOB_TYPE);
362   }
363 
364   /**
365    * OSGI callback for activating this component
366    *
367    * @param cc
368    *          the osgi component context
369    */
370   @Override
371   @Activate
372   public void activate(ComponentContext cc) {
373     super.activate(cc);
374     logger.info("Ingest Service started.");
375     defaultWorkflowDefinionId = StringUtils.trimToNull(cc.getBundleContext().getProperty(WORKFLOW_DEFINITION_DEFAULT));
376     if (defaultWorkflowDefinionId == null) {
377       defaultWorkflowDefinionId = "schedule-and-upload";
378     }
379     registerMXBean = JmxUtil.registerMXBean(ingestStatistics, "IngestStatistics");
380   }
381 
382   /**
383    * Callback from OSGi on service deactivation.
384    */
385   @Deactivate
386   public void deactivate() {
387     JmxUtil.unregisterMXBean(registerMXBean);
388   }
389 
390   /**
391    * {@inheritDoc}
392    *
393    * @see org.osgi.service.cm.ManagedService#updated(java.util.Dictionary)
394    *      Retrieve ManagedService configuration, including option to overwrite series
395    */
396   @Override
397   public void updated(Dictionary<String, ?> properties) throws ConfigurationException {
398 
399     if (properties == null) {
400       logger.info("No configuration available, using defaults");
401       return;
402     }
403 
404     downloadAuthMethod = StringUtils.trimToEmpty((String)properties.get(DOWNLOAD_AUTH_METHOD));
405     if (!"Digest".equals(downloadAuthMethod) && !"Basic".equals(downloadAuthMethod)) {
406       logger.warn("Download authentication method is neither Digest nor Basic; setting to Digest");
407       downloadAuthMethod = "Digest";
408     }
409     downloadAuthForceBasic = BooleanUtils.toBoolean(Objects.toString(properties.get(DOWNLOAD_AUTH_FORCE_BASIC),
410         BooleanUtils.toStringTrueFalse(DEFAULT_DOWNLOAD_AUTH_FORCE_BASIC)));
411     downloadPassword = StringUtils.trimToEmpty((String)properties.get(DOWNLOAD_PASSWORD));
412     downloadUser = StringUtils.trimToEmpty(((String) properties.get(DOWNLOAD_USER)));
413     downloadSource = StringUtils.trimToEmpty(((String) properties.get(DOWNLOAD_SOURCE)));
414     if (!isBlank(downloadSource) && (isBlank(downloadUser) || isBlank(downloadPassword))) {
415       logger.warn("Configured ingest download source has no configured user or password; deactivating authenticated"
416           + "download");
417       downloadSource = "";
418     }
419 
420     skipAttachments = BooleanUtils.toBoolean(Objects.toString(properties.get(SKIP_ATTACHMENTS_KEY),
421             BooleanUtils.toStringTrueFalse(DEFAULT_SKIP)));
422     skipCatalogs = BooleanUtils.toBoolean(Objects.toString(properties.get(SKIP_CATALOGS_KEY),
423             BooleanUtils.toStringTrueFalse(DEFAULT_SKIP)));
424     logger.debug("Skip attachments sent by agents for scheduled events: {}", skipAttachments);
425     logger.debug("Skip metadata catalogs sent by agents for scheduled events: {}", skipCatalogs);
426 
427     ingestFileJobLoad = LoadUtil.getConfiguredLoadValue(properties, FILE_JOB_LOAD_KEY, DEFAULT_INGEST_FILE_JOB_LOAD,
428             serviceRegistry);
429     ingestZipJobLoad = LoadUtil.getConfiguredLoadValue(properties, ZIP_JOB_LOAD_KEY, DEFAULT_INGEST_ZIP_JOB_LOAD,
430             serviceRegistry);
431 
432     isAllowModifySeries = BooleanUtils.toBoolean(Objects.toString(properties.get(MODIFY_OPENCAST_SERIES_KEY),
433               BooleanUtils.toStringTrueFalse(DEFAULT_ALLOW_SERIES_MODIFICATIONS)));
434     isAddOnlyNew = BooleanUtils.toBoolean(Objects.toString(properties.get(ADD_ONLY_NEW_FLAVORS_KEY),
435             BooleanUtils.toStringTrueFalse(DEFAULT_ALLOW_ONLY_NEW_FLAVORS)));
436     logger.info("Only allow new flavored catalogs and attachments on ingest:'{}'", isAddOnlyNew);
437     logger.info("Allowing series modification:'{}'", isAllowModifySeries);
438     createSeriesAppendix = StringUtils.trimToNull(((String) properties.get(SERIES_APPENDIX)));
439   }
440 
441   /**
442    * Sets the trusted http client
443    *
444    * @param httpClient
445    *          the http client
446    */
447   @Reference
448   public void setHttpClient(TrustedHttpClient httpClient) {
449     this.httpClient = httpClient;
450   }
451 
452   /**
453    * Sets the service registry
454    *
455    * @param serviceRegistry
456    *          the serviceRegistry to set
457    */
458   @Reference
459   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
460     this.serviceRegistry = serviceRegistry;
461   }
462 
463   /**
464    * Sets the media inspection service
465    *
466    * @param mediaInspectionService
467    *          the media inspection service to set
468    */
469   @Reference
470   public void setMediaInspectionService(MediaInspectionService mediaInspectionService) {
471     this.mediaInspectionService = mediaInspectionService;
472   }
473 
474   public WorkflowInstance addZippedMediaPackage(InputStream zipStream)
475           throws IngestException, IOException, MediaPackageException {
476     try {
477       return addZippedMediaPackage(zipStream, null, null);
478     } catch (NotFoundException e) {
479       throw new IllegalStateException("A not found exception was thrown without a lookup");
480     }
481   }
482 
483   @Override
484   public WorkflowInstance addZippedMediaPackage(InputStream zipStream, String wd, Map<String, String> workflowConfig)
485           throws MediaPackageException, IOException, IngestException, NotFoundException {
486     try {
487       return addZippedMediaPackage(zipStream, wd, workflowConfig, null);
488     } catch (UnauthorizedException e) {
489       throw new IllegalStateException(e);
490     }
491   }
492 
493   /**
494    * {@inheritDoc}
495    *
496    * @see org.opencastproject.ingest.api.IngestService#addZippedMediaPackage(java.io.InputStream, java.lang.String,
497    *      java.util.Map, java.lang.Long)
498    */
499   @Override
500   public WorkflowInstance addZippedMediaPackage(InputStream zipStream, String workflowDefinitionId,
501           Map<String, String> workflowConfig, Long workflowInstanceId)
502           throws MediaPackageException, IOException, IngestException, NotFoundException, UnauthorizedException {
503     // Start a job synchronously. We can't keep the open input stream waiting around.
504     Job job = null;
505 
506     if (StringUtils.isNotBlank(workflowDefinitionId)) {
507       try {
508         workflowService.getWorkflowDefinitionById(workflowDefinitionId);
509       } catch (WorkflowDatabaseException e) {
510         throw new IngestException(e);
511       } catch (NotFoundException nfe) {
512         logger.warn("Workflow definition {} not found, using default workflow {} instead", workflowDefinitionId,
513                 defaultWorkflowDefinionId);
514         workflowDefinitionId = defaultWorkflowDefinionId;
515       }
516     }
517 
518     if (workflowInstanceId != null) {
519       logger.warn("Deprecated method! Ingesting zipped mediapackage with workflow {}", workflowInstanceId);
520     } else {
521       logger.info("Ingesting zipped mediapackage");
522     }
523 
524     ZipArchiveInputStream zis = null;
525     Set<String> collectionFilenames = new HashSet<>();
526     try {
527       // We don't need anybody to do the dispatching for us. Therefore we need to make sure that the job is never in
528       // QUEUED state but set it to INSTANTIATED in the beginning and then manually switch it to RUNNING.
529       job = serviceRegistry.createJob(JOB_TYPE, INGEST_ZIP, null, null, false, ingestZipJobLoad);
530       job.setStatus(Status.RUNNING);
531       job = serviceRegistry.updateJob(job);
532 
533       // Create the working file target collection for this ingest operation
534       String wfrCollectionId = Long.toString(job.getId());
535 
536       zis = new ZipArchiveInputStream(zipStream);
537       ZipArchiveEntry entry;
538       MediaPackage mp = null;
539       Map<String, URI> uris = new HashMap<>();
540       // Sequential number to append to file names so that, if two files have the same
541       // name, one does not overwrite the other (see MH-9688)
542       int seq = 1;
543       // Folder name to compare with next one to figure out if there's a root folder
544       String folderName = null;
545       // Indicates if zip has a root folder or not, initialized as true
546       boolean hasRootFolder = true;
547       // While there are entries write them to a collection
548       while ((entry = zis.getNextZipEntry()) != null) {
549         try {
550           if (entry.isDirectory() || entry.getName().contains("__MACOSX")) {
551             continue;
552           }
553 
554           if (entry.getName().endsWith("manifest.xml") || entry.getName().endsWith("index.xml")) {
555             // Build the media package
556             final InputStream is = new ZipEntryInputStream(zis, entry.getSize());
557             mp = MediaPackageParser.getFromXml(IOUtils.toString(is, StandardCharsets.UTF_8));
558           } else {
559             logger.info("Storing zip entry {}/{} in working file repository collection '{}'", job.getId(),
560                     entry.getName(), wfrCollectionId);
561             // Since the directory structure is not being mirrored, makes sure the file
562             // name is different than the previous one(s) by adding a sequential number
563             String fileName = FilenameUtils.getBaseName(entry.getName()) + "_" + seq++ + "."
564                     + FilenameUtils.getExtension(entry.getName());
565             URI contentUri = workingFileRepository.putInCollection(wfrCollectionId, fileName,
566                     new ZipEntryInputStream(zis, entry.getSize()));
567             collectionFilenames.add(fileName);
568             // Key is the zip entry name as it is
569             String key = entry.getName();
570             uris.put(key, contentUri);
571             ingestStatistics.add(entry.getSize());
572             logger.info("Zip entry {}/{} stored at {}", job.getId(), entry.getName(), contentUri);
573             // Figures out if there's a root folder. Does entry name starts with a folder?
574             int pos = entry.getName().indexOf('/');
575             if (pos == -1) {
576               // No, we can conclude there's no root folder
577               hasRootFolder = false;
578             } else if (hasRootFolder && folderName != null && !folderName.equals(entry.getName().substring(0, pos))) {
579               // Folder name different from previous so there's no root folder
580               hasRootFolder = false;
581             } else if (folderName == null) {
582               // Just initialize folder name
583               folderName = entry.getName().substring(0, pos);
584             }
585           }
586         } catch (IOException e) {
587           logger.warn("Unable to process zip entry {}", entry.getName(), e);
588           throw e;
589         }
590       }
591 
592       if (mp == null) {
593         throw new MediaPackageException("No manifest found in this zip");
594       }
595 
596       // Determine the mediapackage identifier
597       if (mp.getIdentifier() == null || isBlank(mp.getIdentifier().toString())) {
598         mp.setIdentifier(IdImpl.fromUUID());
599       }
600 
601       String mediaPackageId = mp.getIdentifier().toString();
602 
603       logger.info("Ingesting mediapackage {} is named '{}'", mediaPackageId, mp.getTitle());
604 
605       // Make sure there are tracks in the mediapackage
606       if (mp.getTracks().length == 0) {
607         logger.warn("Mediapackage {} has no media tracks", mediaPackageId);
608       }
609 
610       // Update the element uris to point to their working file repository location
611       for (MediaPackageElement element : mp.elements()) {
612         // Key has root folder name if there is one
613         URI uri = uris.get((hasRootFolder ? folderName + "/" : "") + element.getURI().toString());
614 
615         if (uri == null) {
616           throw new MediaPackageException("Unable to map element name '" + element.getURI() + "' to workspace uri");
617         }
618         logger.info("Ingested mediapackage element {}/{} located at {}", mediaPackageId, element.getIdentifier(), uri);
619         URI dest = workingFileRepository.moveTo(wfrCollectionId, FilenameUtils.getName(uri.toString()), mediaPackageId,
620                 element.getIdentifier(), FilenameUtils.getName(element.getURI().toString()));
621         element.setURI(dest);
622       }
623 
624       // Now that all elements are in place, start with ingest
625       logger.info("Initiating processing of ingested mediapackage {}", mediaPackageId);
626       WorkflowInstance workflowInstance = ingest(mp, workflowDefinitionId, workflowConfig, workflowInstanceId);
627       logger.info("Ingest of mediapackage {} done", mediaPackageId);
628       job.setStatus(Job.Status.FINISHED);
629       return workflowInstance;
630     } catch (ServiceRegistryException e) {
631       throw new IngestException(e);
632     } catch (MediaPackageException e) {
633       job.setStatus(Job.Status.FAILED, Job.FailureReason.DATA);
634       throw e;
635     } catch (Exception e) {
636       if (e instanceof IngestException) {
637         throw (IngestException) e;
638       }
639       throw new IngestException(e);
640     } finally {
641       IOUtils.closeQuietly(zis);
642       finallyUpdateJob(job);
643       for (String filename : collectionFilenames) {
644         workingFileRepository.deleteFromCollection(Long.toString(job.getId()), filename, true);
645       }
646     }
647   }
648 
649   /**
650    * {@inheritDoc}
651    *
652    * @see org.opencastproject.ingest.api.IngestService#createMediaPackage()
653    */
654   @Override
655   public MediaPackage createMediaPackage() throws MediaPackageException, ConfigurationException {
656     MediaPackage mediaPackage;
657     try {
658       mediaPackage = MediaPackageBuilderFactory.newInstance().newMediaPackageBuilder().createNew();
659     } catch (MediaPackageException e) {
660       logger.error("INGEST:Failed to create media package " + e.getLocalizedMessage());
661       throw e;
662     }
663     mediaPackage.setDate(new Date());
664     logger.info("Created mediapackage {}", mediaPackage);
665     return mediaPackage;
666   }
667 
668   /**
669    * {@inheritDoc}
670    *
671    * @see org.opencastproject.ingest.api.IngestService#createMediaPackage()
672    */
673   @Override
674   public MediaPackage createMediaPackage(String mediaPackageId)
675           throws MediaPackageException, ConfigurationException {
676     MediaPackage mediaPackage;
677     try {
678       mediaPackage = MediaPackageBuilderFactory.newInstance().newMediaPackageBuilder()
679               .createNew(new IdImpl(mediaPackageId));
680     } catch (MediaPackageException e) {
681       logger.error("INGEST:Failed to create media package " + e.getLocalizedMessage());
682       throw e;
683     }
684     mediaPackage.setDate(new Date());
685     logger.info("Created mediapackage {}", mediaPackage);
686     return mediaPackage;
687   }
688 
689   /**
690    * {@inheritDoc}
691    *
692    * @see org.opencastproject.ingest.api.IngestService#addTrack(java.net.URI,
693    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, String[] ,
694    *      org.opencastproject.mediapackage.MediaPackage)
695    */
696   @Override
697   public MediaPackage addTrack(URI uri, MediaPackageElementFlavor flavor, String[] tags, MediaPackage mediaPackage)
698           throws IOException, IngestException {
699     Job job = null;
700     try {
701       job = serviceRegistry
702               .createJob(
703                       JOB_TYPE, INGEST_TRACK_FROM_URI, Arrays.asList(uri.toString(),
704                               flavor == null ? null : flavor.toString(), MediaPackageParser.getAsXml(mediaPackage)),
705                       null, false, ingestFileJobLoad);
706       job.setStatus(Status.RUNNING);
707       job = serviceRegistry.updateJob(job);
708       String elementId = UUID.randomUUID().toString();
709       logger.info("Start adding track {} from URL {} on mediapackage {}", elementId, uri, mediaPackage);
710       URI newUrl = addContentToRepo(mediaPackage, elementId, uri);
711       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Track,
712               flavor);
713       if (tags != null && tags.length > 0) {
714         MediaPackageElement trackElement = mp.getTrack(elementId);
715         for (String tag : tags) {
716           logger.info("Adding tag: " + tag + " to Element: " + elementId);
717           trackElement.addTag(tag);
718         }
719       }
720 
721       job.setStatus(Job.Status.FINISHED);
722       logger.info("Successful added track {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
723       return mp;
724     } catch (IOException e) {
725       throw e;
726     } catch (ServiceRegistryException e) {
727       throw new IngestException(e);
728     } catch (NotFoundException e) {
729       throw new IngestException("Unable to update ingest job", e);
730     } finally {
731       finallyUpdateJob(job);
732     }
733   }
734 
735   /**
736    * {@inheritDoc}
737    *
738    * @see org.opencastproject.ingest.api.IngestService#addTrack(java.io.InputStream, java.lang.String,
739    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
740    */
741   @Override
742   public MediaPackage addTrack(InputStream in, String fileName, MediaPackageElementFlavor flavor,
743           MediaPackage mediaPackage) throws IOException, IngestException {
744     String[] tags = null;
745     return this.addTrack(in, fileName, flavor, tags, mediaPackage);
746   }
747 
748   /**
749    * {@inheritDoc}
750    *
751    * @see org.opencastproject.ingest.api.IngestService#addTrack(java.io.InputStream, java.lang.String,
752    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
753    */
754   @Override
755   public MediaPackage addTrack(InputStream in, String fileName, MediaPackageElementFlavor flavor, String[] tags,
756           MediaPackage mediaPackage) throws IOException, IngestException {
757     Job job = null;
758     try {
759       job = serviceRegistry.createJob(JOB_TYPE, INGEST_TRACK, null, null, false, ingestFileJobLoad);
760       job.setStatus(Status.RUNNING);
761       job = serviceRegistry.updateJob(job);
762       String elementId = UUID.randomUUID().toString();
763       logger.info("Start adding track {} from input stream on mediapackage {}", elementId, mediaPackage);
764       if (fileName.length() > FILENAME_LENGTH_MAX) {
765         final String extension = "." + FilenameUtils.getExtension(fileName);
766         final int length = Math.max(0, FILENAME_LENGTH_MAX - extension.length());
767         fileName = fileName.substring(0, length) + extension;
768       }
769       URI newUrl = addContentToRepo(mediaPackage, elementId, fileName, in);
770       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Track,
771               flavor);
772       if (tags != null && tags.length > 0) {
773         MediaPackageElement trackElement = mp.getTrack(elementId);
774         for (String tag : tags) {
775           logger.debug("Adding tag `{}` to element {}", tag, elementId);
776           trackElement.addTag(tag);
777         }
778       }
779 
780       job.setStatus(Job.Status.FINISHED);
781       logger.info("Successful added track {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
782       return mp;
783     } catch (IOException e) {
784       throw e;
785     } catch (ServiceRegistryException e) {
786       throw new IngestException(e);
787     } catch (NotFoundException e) {
788       throw new IngestException("Unable to update ingest job", e);
789     } finally {
790       finallyUpdateJob(job);
791     }
792   }
793 
794   @Override
795   public MediaPackage addPartialTrack(URI uri, MediaPackageElementFlavor flavor, long startTime,
796           MediaPackage mediaPackage) throws IOException, IngestException {
797     Job job = null;
798     try {
799       job = serviceRegistry.createJob(
800               JOB_TYPE,
801               INGEST_TRACK_FROM_URI,
802               Arrays.asList(uri.toString(), flavor == null ? null : flavor.toString(),
803                       MediaPackageParser.getAsXml(mediaPackage)), null, false);
804       job.setStatus(Status.RUNNING);
805       job = serviceRegistry.updateJob(job);
806       String elementId = UUID.randomUUID().toString();
807       logger.info("Start adding partial track {} from URL {} on mediapackage {}", elementId, uri, mediaPackage);
808       URI newUrl = addContentToRepo(mediaPackage, elementId, uri);
809       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Track,
810               flavor);
811       job.setStatus(Job.Status.FINISHED);
812       // store startTime
813       partialTrackStartTimes.put(elementId, startTime);
814       logger.debug("Added start time {} for track {}", startTime, elementId);
815       logger.info("Successful added partial track {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
816       return mp;
817     } catch (ServiceRegistryException e) {
818       throw new IngestException(e);
819     } catch (NotFoundException e) {
820       throw new IngestException("Unable to update ingest job", e);
821     } finally {
822       finallyUpdateJob(job);
823     }
824   }
825 
826   @Override
827   public MediaPackage addPartialTrack(InputStream in, String fileName, MediaPackageElementFlavor flavor, long startTime,
828           MediaPackage mediaPackage) throws IOException, IngestException {
829     Job job = null;
830     try {
831       job = serviceRegistry.createJob(JOB_TYPE, INGEST_TRACK, null, null, false);
832       job.setStatus(Status.RUNNING);
833       job = serviceRegistry.updateJob(job);
834       String elementId = UUID.randomUUID().toString();
835       logger.info("Start adding partial track {} from input stream on mediapackage {}", elementId, mediaPackage);
836       URI newUrl = addContentToRepo(mediaPackage, elementId, fileName, in);
837       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Track,
838               flavor);
839       job.setStatus(Job.Status.FINISHED);
840       // store startTime
841       partialTrackStartTimes.put(elementId, startTime);
842       logger.debug("Added start time {} for track {}", startTime, elementId);
843       logger.info("Successful added partial track {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
844       return mp;
845     } catch (ServiceRegistryException e) {
846       throw new IngestException(e);
847     } catch (NotFoundException e) {
848       throw new IngestException("Unable to update ingest job", e);
849     } finally {
850       finallyUpdateJob(job);
851     }
852   }
853 
854   /**
855    * {@inheritDoc}
856    *
857    * @see org.opencastproject.ingest.api.IngestService#addCatalog(java.net.URI,
858    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, String[],
859    *      org.opencastproject.mediapackage.MediaPackage)
860    */
861   @Override
862   public MediaPackage addCatalog(URI uri, MediaPackageElementFlavor flavor, String[] tags, MediaPackage mediaPackage)
863           throws IOException, IngestException {
864     Job job = null;
865     try {
866       job = serviceRegistry.createJob(JOB_TYPE, INGEST_CATALOG_FROM_URI,
867               Arrays.asList(uri.toString(), flavor.toString(), MediaPackageParser.getAsXml(mediaPackage)), null, false,
868               ingestFileJobLoad);
869       job.setStatus(Status.RUNNING);
870       job = serviceRegistry.updateJob(job);
871       String elementId = UUID.randomUUID().toString();
872       logger.info("Start adding catalog {} from URL {} on mediapackage {}", elementId, uri, mediaPackage);
873       URI newUrl = addContentToRepo(mediaPackage, elementId, uri);
874       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Catalog,
875               flavor);
876       if (tags != null && tags.length > 0) {
877         MediaPackageElement catalogElement = mp.getCatalog(elementId);
878         for (String tag : tags) {
879           logger.info("Adding tag: " + tag + " to Element: " + elementId);
880           catalogElement.addTag(tag);
881         }
882       }
883       job.setStatus(Job.Status.FINISHED);
884       logger.info("Successful added catalog {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
885       return mp;
886     } catch (ServiceRegistryException e) {
887       throw new IngestException(e);
888     } catch (NotFoundException e) {
889       throw new IngestException("Unable to update ingest job", e);
890     } finally {
891       finallyUpdateJob(job);
892     }
893   }
894 
895   /**
896    * Updates the persistent representation of a series based on a potentially modified dublin core document.
897    *
898    * @param mediaPackage
899    *         the media package containing series metadata and ACLs.
900    * @return
901    *         true, if the series is created or overwritten, false if the existing series remains intact.
902    * @throws IOException if the series catalog was not found
903    * @throws IngestException if any other exception was encountered
904    */
905   protected boolean updateSeries(MediaPackage mediaPackage) throws IOException, IngestException {
906     Catalog[] seriesCatalogs = mediaPackage.getCatalogs(MediaPackageElements.SERIES);
907     if (seriesCatalogs.length == 0) {
908       return false;
909     } else if (seriesCatalogs.length > 1) {
910       logger.warn("Mediapackage {} has more than one series dublincore catalogs. Using catalog {} with ID {}.",
911           mediaPackage.getIdentifier(), seriesCatalogs[0].getURI(), seriesCatalogs[0].getIdentifier());
912     }
913     // Parse series dublincore
914     HttpResponse response = null;
915     InputStream in = null;
916     boolean isUpdated = false;
917     boolean isNew = false;
918     String seriesId = null;
919     try {
920       HttpGet getDc = new HttpGet(seriesCatalogs[0].getURI());
921       response = httpClient.execute(getDc);
922       in = response.getEntity().getContent();
923       DublinCoreCatalog dc = dublinCoreService.load(in);
924       seriesId = dc.getFirst(DublinCore.PROPERTY_IDENTIFIER);
925       if (seriesId == null) {
926         logger.warn("Series dublin core document contains no identifier, "
927             + "rejecting ingested series catalog for mediapackage {}.", mediaPackage.getIdentifier());
928       } else {
929         try {
930           try {
931             seriesService.getSeries(seriesId);
932             if (isAllowModifySeries) {
933               // Update existing series
934               seriesService.updateSeries(dc);
935               isUpdated = true;
936               logger.debug("Ingest is overwriting the existing series {} with the ingested series", seriesId);
937             } else {
938               logger.debug("Series {} already exists. Ignoring series catalog from ingest.", seriesId);
939             }
940           } catch (NotFoundException e) {
941             logger.info("Creating new series {} with default ACL.", seriesId);
942             seriesService.updateSeries(dc);
943             isUpdated = true;
944             isNew = true;
945           }
946 
947         } catch (Exception e) {
948           throw new IngestException(e);
949         }
950       }
951       in.close();
952     } catch (IOException e) {
953       logger.error("Error updating series from DublinCoreCatalog.}", e);
954     } finally {
955       IOUtils.closeQuietly(in);
956       httpClient.close(response);
957     }
958     if (!isUpdated) {
959       return isUpdated;
960     }
961     // Apply series extended metadata
962     for (MediaPackageElement seriesElement : mediaPackage.getElementsByFlavor(
963         MediaPackageElementFlavor.parseFlavor("*/series"))) {
964       if (MediaPackageElement.Type.Catalog == seriesElement.getElementType()
965           && !MediaPackageElements.SERIES.equals(seriesElement.getFlavor())) {
966         String catalogType = seriesElement.getFlavor().getType();
967         logger.info("Apply series {} metadata catalog from mediapackage {} to newly created series {}.",
968             catalogType, mediaPackage.getIdentifier(), seriesId);
969         byte[] data;
970         try {
971           HttpGet getExtendedMetadata = new HttpGet(seriesElement.getURI());
972           response = httpClient.execute(getExtendedMetadata);
973           in = response.getEntity().getContent();
974           data = IOUtils.readFully(in, (int) response.getEntity().getContentLength());
975         } catch (Exception e) {
976           throw new IngestException("Unable to read series " + catalogType + " metadata catalog for series "
977               + seriesId + ".", e);
978         } finally {
979           IOUtils.closeQuietly(in);
980           httpClient.close(response);
981         }
982         try {
983           seriesService.updateSeriesElement(seriesId, catalogType, data);
984         } catch (SeriesException e) {
985           throw new IngestException(
986               "Unable to update series " + catalogType + " catalog on newly created series " + seriesId + ".", e);
987         }
988       }
989     }
990     if (isNew) {
991       logger.info("Apply series ACL from mediapackage {} to newly created series {}.",
992           mediaPackage.getIdentifier(), seriesId);
993       Attachment[] seriesXacmls = mediaPackage.getAttachments(MediaPackageElements.XACML_POLICY_SERIES);
994       if (seriesXacmls.length > 0) {
995         if (seriesXacmls.length > 1) {
996           logger.warn("Mediapackage {} has more than one series xacml attachments. Using {}.",
997               mediaPackage.getIdentifier(), seriesXacmls[0].getURI());
998         }
999         AccessControlList seriesAcl = null;
1000         try {
1001           HttpGet getXacml = new HttpGet(seriesXacmls[0].getURI());
1002           response = httpClient.execute(getXacml);
1003           in = response.getEntity().getContent();
1004           seriesAcl = XACMLUtils.parseXacml(in);
1005         } catch (XACMLParsingException ex) {
1006           throw new IngestException("Unable to parse series xacml from mediapackage "
1007               + mediaPackage.getIdentifier() + ".", ex);
1008         } catch (IOException e) {
1009           logger.error("Error updating series {} ACL from mediapackage {}.",
1010               seriesId, mediaPackage.getIdentifier(), e);
1011           throw e;
1012         } finally {
1013           IOUtils.closeQuietly(in);
1014           httpClient.close(response);
1015         }
1016         try {
1017           seriesService.updateAccessControl(seriesId, seriesAcl);
1018         } catch (Exception e) {
1019           throw new IngestException("Unable to update series ACL on newly created series " + seriesId + ".", e);
1020         }
1021       }
1022     }
1023     return isUpdated;
1024   }
1025 
1026   /**
1027    * {@inheritDoc}
1028    *
1029    * @see org.opencastproject.ingest.api.IngestService#addCatalog(java.io.InputStream, java.lang.String,
1030    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
1031    */
1032   @Override
1033   public MediaPackage addCatalog(InputStream in, String fileName, MediaPackageElementFlavor flavor,
1034           MediaPackage mediaPackage) throws IOException, IngestException {
1035     return addCatalog(in, fileName, flavor, null, mediaPackage);
1036   }
1037 
1038   /**
1039    * {@inheritDoc}
1040    *
1041    * @see org.opencastproject.ingest.api.IngestService#addCatalog(java.io.InputStream, java.lang.String,
1042    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
1043    */
1044   @Override
1045   public MediaPackage addCatalog(InputStream in, String fileName, MediaPackageElementFlavor flavor, String[] tags,
1046           MediaPackage mediaPackage) throws IOException, IngestException, IllegalArgumentException {
1047     Job job = null;
1048     try {
1049       job = serviceRegistry.createJob(JOB_TYPE, INGEST_CATALOG, null, null, false, ingestFileJobLoad);
1050       job.setStatus(Status.RUNNING);
1051       job = serviceRegistry.updateJob(job);
1052       final String elementId = UUID.randomUUID().toString();
1053       final String mediaPackageId = mediaPackage.getIdentifier().toString();
1054       logger.info("Start adding catalog {} from input stream on mediapackage {}", elementId, mediaPackageId);
1055       final URI newUrl = addContentToRepo(mediaPackage, elementId, fileName, in);
1056 
1057       final boolean isJSON;
1058       try (InputStream inputStream = workingFileRepository.get(mediaPackageId, elementId)) {
1059         try (BufferedReader reader  = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
1060           // Exception for current BBB integration and Extron SMP351 which is ingesting a JSON array/object as catalog
1061           int firstChar = reader.read();
1062           isJSON = firstChar == '[' || firstChar == '{';
1063         }
1064       }
1065 
1066       if (isJSON) {
1067         logger.warn("Input catalog seems to be JSON. This is a mistake and will fail in future Opencast versions."
1068             + "You will likely want to ingest this as a media package attachment instead.");
1069       } else {
1070         // Verify XML is not corrupted
1071         try {
1072           XmlSafeParser.parse(workingFileRepository.get(mediaPackageId, elementId));
1073         } catch (SAXException e) {
1074           workingFileRepository.delete(mediaPackageId, elementId);
1075           throw new IllegalArgumentException("Catalog XML is invalid", e);
1076         }
1077       }
1078       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Catalog,
1079               flavor);
1080       if (tags != null && tags.length > 0) {
1081         MediaPackageElement trackElement = mp.getCatalog(elementId);
1082         for (String tag : tags) {
1083           logger.info("Adding tag {} to element {}", tag, elementId);
1084           trackElement.addTag(tag);
1085         }
1086       }
1087 
1088       job.setStatus(Job.Status.FINISHED);
1089       logger.info("Successful added catalog {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
1090       return mp;
1091     } catch (ServiceRegistryException e) {
1092       throw new IngestException(e);
1093     } catch (NotFoundException e) {
1094       throw new IngestException("Unable to update ingest job", e);
1095     } finally {
1096       finallyUpdateJob(job);
1097     }
1098   }
1099 
1100   /**
1101    * {@inheritDoc}
1102    *
1103    * @see org.opencastproject.ingest.api.IngestService#addAttachment(java.net.URI,
1104    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, String[],
1105    *      org.opencastproject.mediapackage.MediaPackage)
1106    */
1107   @Override
1108   public MediaPackage addAttachment(URI uri, MediaPackageElementFlavor flavor, String[] tags, MediaPackage mediaPackage)
1109           throws IOException, IngestException {
1110     Job job = null;
1111     try {
1112       job = serviceRegistry.createJob(JOB_TYPE, INGEST_ATTACHMENT_FROM_URI,
1113               Arrays.asList(uri.toString(), flavor.toString(), MediaPackageParser.getAsXml(mediaPackage)), null, false,
1114               ingestFileJobLoad);
1115       job.setStatus(Status.RUNNING);
1116       job = serviceRegistry.updateJob(job);
1117       String elementId = UUID.randomUUID().toString();
1118       logger.info("Start adding attachment {} from URL {} on mediapackage {}", elementId, uri, mediaPackage);
1119       URI newUrl = addContentToRepo(mediaPackage, elementId, uri);
1120       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Attachment,
1121               flavor);
1122       if (tags != null && tags.length > 0) {
1123         MediaPackageElement attachmentElement = mp.getAttachment(elementId);
1124         for (String tag : tags) {
1125           logger.debug("Adding tag: " + tag + " to Element: " + elementId);
1126           attachmentElement.addTag(tag);
1127         }
1128       }
1129       job.setStatus(Job.Status.FINISHED);
1130       logger.info("Successful added attachment {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
1131       return mp;
1132     } catch (ServiceRegistryException e) {
1133       throw new IngestException(e);
1134     } catch (NotFoundException e) {
1135       throw new IngestException("Unable to update ingest job", e);
1136     } finally {
1137       finallyUpdateJob(job);
1138     }
1139   }
1140 
1141   /**
1142    * {@inheritDoc}
1143    *
1144    * @see org.opencastproject.ingest.api.IngestService#addAttachment(java.io.InputStream, java.lang.String,
1145    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
1146    */
1147   @Override
1148   public MediaPackage addAttachment(InputStream in, String fileName, MediaPackageElementFlavor flavor, String[] tags,
1149           MediaPackage mediaPackage) throws IOException, IngestException {
1150     Job job = null;
1151     try {
1152       job = serviceRegistry.createJob(JOB_TYPE, INGEST_ATTACHMENT, null, null, false, ingestFileJobLoad);
1153       job.setStatus(Status.RUNNING);
1154       job = serviceRegistry.updateJob(job);
1155       String elementId = UUID.randomUUID().toString();
1156       logger.info("Start adding attachment {} from input stream on mediapackage {}", elementId, mediaPackage);
1157       URI newUrl = addContentToRepo(mediaPackage, elementId, fileName, in);
1158       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Attachment,
1159               flavor);
1160       if (tags != null && tags.length > 0) {
1161         MediaPackageElement trackElement = mp.getAttachment(elementId);
1162         for (String tag : tags) {
1163           logger.info("Adding tag: " + tag + " to Element: " + elementId);
1164           trackElement.addTag(tag);
1165         }
1166       }
1167       job.setStatus(Job.Status.FINISHED);
1168       logger.info("Successful added attachment {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
1169       return mp;
1170     } catch (ServiceRegistryException e) {
1171       throw new IngestException(e);
1172     } catch (NotFoundException e) {
1173       throw new IngestException("Unable to update ingest job", e);
1174     } finally {
1175       finallyUpdateJob(job);
1176     }
1177 
1178   }
1179 
1180   /**
1181    * {@inheritDoc}
1182    *
1183    * @see org.opencastproject.ingest.api.IngestService#addAttachment(java.io.InputStream, java.lang.String,
1184    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
1185    */
1186   @Override
1187   public MediaPackage addAttachment(InputStream in, String fileName, MediaPackageElementFlavor flavor,
1188           MediaPackage mediaPackage) throws IOException, IngestException {
1189     String[] tags = null;
1190     return addAttachment(in, fileName, flavor, tags, mediaPackage);
1191   }
1192 
1193   /**
1194    *
1195    * {@inheritDoc}
1196    *
1197    * @see org.opencastproject.ingest.api.IngestService#ingest(org.opencastproject.mediapackage.MediaPackage)
1198    */
1199   @Override
1200   public WorkflowInstance ingest(MediaPackage mp) throws IngestException {
1201     try {
1202       return ingest(mp, null, null, null);
1203     } catch (NotFoundException e) {
1204       throw new IngestException(e);
1205     } catch (UnauthorizedException e) {
1206       throw new IllegalStateException(e);
1207     }
1208   }
1209 
1210   /**
1211    * {@inheritDoc}
1212    *
1213    * @see org.opencastproject.ingest.api.IngestService#ingest(org.opencastproject.mediapackage.MediaPackage,
1214    *      java.lang.String, java.util.Map)
1215    */
1216   @Override
1217   public WorkflowInstance ingest(MediaPackage mp, String wd, Map<String, String> properties)
1218           throws IngestException, NotFoundException {
1219     try {
1220       return ingest(mp, wd, properties, null);
1221     } catch (UnauthorizedException e) {
1222       throw new IllegalStateException(e);
1223     }
1224   }
1225 
1226   /**
1227    * {@inheritDoc}
1228    *
1229    * @see org.opencastproject.ingest.api.IngestService#ingest(org.opencastproject.mediapackage.MediaPackage,
1230    *      java.lang.String, java.util.Map, java.lang.Long)
1231    */
1232   @Override
1233   public WorkflowInstance ingest(MediaPackage mp, String workflowDefinitionId, Map<String, String> properties,
1234           Long workflowInstanceId) throws IngestException, NotFoundException, UnauthorizedException {
1235     // Check for legacy media package id
1236     mp = checkForLegacyMediaPackageId(mp, properties);
1237 
1238     try {
1239       mp = createSmil(mp);
1240     } catch (IOException e) {
1241       throw new IngestException("Unable to add SMIL Catalog", e);
1242     }
1243 
1244     try {
1245       updateSeries(mp);
1246     } catch (IOException e) {
1247       throw new IngestException("Unable to create or update series from mediapackage " + mp.getIdentifier() + ".", e);
1248     }
1249 
1250     // Done, update the job status and return the created workflow instance
1251     if (workflowInstanceId != null) {
1252       logger.warn(
1253               "Resuming workflow {} with ingested mediapackage {} is deprecated, skip resuming and start new workflow",
1254               workflowInstanceId, mp);
1255     }
1256 
1257     if (workflowDefinitionId == null) {
1258       logger.info("Starting a new workflow with ingested mediapackage {} based on the default workflow definition '{}'",
1259               mp, defaultWorkflowDefinionId);
1260     } else {
1261       logger.info("Starting a new workflow with ingested mediapackage {} based on workflow definition '{}'", mp,
1262               workflowDefinitionId);
1263     }
1264 
1265     try {
1266       // Determine the workflow definition
1267       WorkflowDefinition workflowDef = getWorkflowDefinition(workflowDefinitionId, mp);
1268 
1269       // Get the final set of workflow properties
1270       properties = mergeWorkflowConfiguration(properties, mp.getIdentifier().toString());
1271 
1272       // Remove potential workflow configuration prefixes from the workflow properties
1273       properties = removePrefixFromProperties(properties);
1274 
1275       // Merge scheduled mediapackage with ingested
1276       mp = mergeScheduledMediaPackage(mp);
1277       if (mp.getSeries() == null) {
1278         mp = checkForCASeries(mp, createSeriesAppendix);
1279       }
1280 
1281 
1282       ingestStatistics.successful();
1283       if (workflowDef != null) {
1284         logger.info("Starting new workflow with ingested mediapackage '{}' using the specified template '{}'",
1285                 mp.getIdentifier().toString(), workflowDefinitionId);
1286       } else {
1287         logger.info("Starting new workflow with ingested mediapackage '{}' using the default template '{}'",
1288                 mp.getIdentifier().toString(), defaultWorkflowDefinionId);
1289       }
1290       return workflowService.start(workflowDef, mp, properties);
1291     } catch (WorkflowException e) {
1292       ingestStatistics.failed();
1293       throw new IngestException(e);
1294     }
1295   }
1296 
1297   @Override
1298   public void schedule(MediaPackage mediaPackage, String workflowDefinitionID, Map<String, String> properties)
1299           throws IllegalStateException, IngestException, NotFoundException, UnauthorizedException, SchedulerException {
1300     MediaPackageElement[] mediaPackageElements = mediaPackage.getElementsByFlavor(MediaPackageElements.EPISODE);
1301     if (mediaPackageElements.length != 1) {
1302       logger.debug("There can be only one (and exactly one) episode dublin core catalog: https://youtu.be/_J3VeogFUOs");
1303       throw new IngestException("There can be only one (and exactly one) episode dublin core catalog");
1304     }
1305     InputStream inputStream;
1306     DublinCoreCatalog dublinCoreCatalog;
1307     try {
1308       inputStream = workingFileRepository.get(mediaPackage.getIdentifier().toString(),
1309               mediaPackageElements[0].getIdentifier());
1310       dublinCoreCatalog = dublinCoreService.load(inputStream);
1311     } catch (IOException e) {
1312       throw new IngestException(e);
1313     }
1314 
1315     EName temporal = new EName(DublinCore.TERMS_NS_URI, "temporal");
1316     List<DublinCoreValue> periods = dublinCoreCatalog.get(temporal);
1317     if (periods.size() != 1) {
1318       logger.debug("There can be only one (and exactly one) period");
1319       throw new IngestException("There can be only one (and exactly one) period");
1320     }
1321     DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(periods.get(0));
1322     if (!period.hasStart() || !period.hasEnd()) {
1323       logger.debug("A scheduled recording needs to have a start and end.");
1324       throw new IngestException("A scheduled recording needs to have a start and end.");
1325     }
1326     EName createdEName = new EName(DublinCore.TERMS_NS_URI, "created");
1327     List<DublinCoreValue> created = dublinCoreCatalog.get(createdEName);
1328     if (created.size() == 0) {
1329       logger.debug("Created not set");
1330     } else if (created.size() == 1) {
1331       Date date = EncodingSchemeUtils.decodeMandatoryDate(created.get(0));
1332       if (date.getTime() != period.getStart().getTime()) {
1333         logger.debug("start and created date differ ({} vs {})", date.getTime(), period.getStart().getTime());
1334         throw new IngestException("Temporal start and created date differ");
1335       }
1336     } else {
1337       logger.debug("There can be only one created date");
1338       throw new IngestException("There can be only one created date");
1339     }
1340     String captureAgent = getCaptureAgent(dublinCoreCatalog);
1341 
1342     // Go through properties
1343     Map<String, String> agentProperties = new HashMap<>();
1344     Map<String, String> workflowProperties = new HashMap<>();
1345     for (String key : properties.keySet()) {
1346       if (key.startsWith("org.opencastproject.workflow.config.")) {
1347         workflowProperties.put(key, properties.get(key));
1348       } else {
1349         agentProperties.put(key, properties.get(key));
1350       }
1351     }
1352 
1353     // Remove workflow configuration prefixes from the workflow properties
1354     workflowProperties = removePrefixFromProperties(workflowProperties);
1355 
1356     try {
1357       schedulerService.addEvent(period.getStart(), period.getEnd(), captureAgent, new HashSet<>(), mediaPackage,
1358               workflowProperties, agentProperties, Optional.empty());
1359     } finally {
1360       for (MediaPackageElement mediaPackageElement : mediaPackage.getElements()) {
1361         try {
1362           workingFileRepository.delete(mediaPackage.getIdentifier().toString(), mediaPackageElement.getIdentifier());
1363         } catch (IOException e) {
1364           logger.warn("Failed to delete media package element", e);
1365         }
1366       }
1367     }
1368   }
1369 
1370   private String getCaptureAgent(DublinCoreCatalog dublinCoreCatalog) throws IngestException {
1371     // spatial
1372     EName spatial = new EName(DublinCore.TERMS_NS_URI, "spatial");
1373     List<DublinCoreValue> captureAgents = dublinCoreCatalog.get(spatial);
1374     if (captureAgents.size() != 1) {
1375       logger.debug("Exactly one capture agent needs to be set");
1376       throw new IngestException("Exactly one capture agent needs to be set");
1377     }
1378     return captureAgents.get(0).getValue();
1379   }
1380 
1381   /**
1382    * Check whether the mediapackage id is set via the legacy workflow identifier and change the id if existing.
1383    *
1384    * @param mp
1385    *          the mediapackage
1386    * @param properties
1387    *          the workflow properties
1388    * @return the mediapackage
1389    */
1390   private MediaPackage checkForLegacyMediaPackageId(MediaPackage mp, Map<String, String> properties)
1391           throws IngestException {
1392     if (properties == null || properties.isEmpty()) {
1393       return mp;
1394     }
1395 
1396     try {
1397       String mediaPackageId = properties.get(LEGACY_MEDIAPACKAGE_ID_KEY);
1398       if (StringUtils.isNotBlank(mediaPackageId) && schedulerService != null) {
1399         logger.debug("Check ingested mediapackage {} for legacy mediapackage identifier {}",
1400                 mp.getIdentifier().toString(), mediaPackageId);
1401         try {
1402           schedulerService.getMediaPackage(mp.getIdentifier().toString());
1403           return mp;
1404         } catch (NotFoundException e) {
1405           logger.info("No scheduler mediapackage found with ingested id {}, try legacy mediapackage id {}",
1406                   mp.getIdentifier().toString(), mediaPackageId);
1407           try {
1408             schedulerService.getMediaPackage(mediaPackageId);
1409             logger.info("Legacy mediapackage id {} exists, change ingested mediapackage id {} to legacy id",
1410                     mediaPackageId, mp.getIdentifier().toString());
1411             mp.setIdentifier(new IdImpl(mediaPackageId));
1412             return mp;
1413           } catch (NotFoundException e1) {
1414             logger.info("No scheduler mediapackage found with legacy mediapackage id {}, skip merging", mediaPackageId);
1415           } catch (Exception e1) {
1416             logger.error("Unable to get event mediapackage from scheduler event {}", mediaPackageId, e);
1417             throw new IngestException(e);
1418           }
1419         } catch (Exception e) {
1420           logger.error("Unable to get event mediapackage from scheduler event {}", mp.getIdentifier().toString(), e);
1421           throw new IngestException(e);
1422         }
1423       }
1424       return mp;
1425     } finally {
1426       properties.remove(LEGACY_MEDIAPACKAGE_ID_KEY);
1427     }
1428   }
1429 
1430   private Map<String, String> mergeWorkflowConfiguration(Map<String, String> properties, String mediaPackageId) {
1431     if (isBlank(mediaPackageId) || schedulerService == null) {
1432       return properties;
1433     }
1434 
1435     HashMap<String, String> mergedProperties = new HashMap<>();
1436 
1437     try {
1438       Map<String, String> recordingProperties = schedulerService.getCaptureAgentConfiguration(mediaPackageId);
1439       logger.debug("Restoring workflow properties from scheduler event {}", mediaPackageId);
1440       mergedProperties.putAll(recordingProperties);
1441     } catch (SchedulerException e) {
1442       logger.warn("Unable to get workflow properties from scheduler event {}", mediaPackageId, e);
1443     } catch (NotFoundException e) {
1444       logger.info("No capture event found for id {}", mediaPackageId);
1445     } catch (UnauthorizedException e) {
1446       throw new IllegalStateException(e);
1447     }
1448 
1449     if (properties != null) {
1450       // Merge the properties, this must be after adding the recording properties
1451       logger.debug("Merge workflow properties with the one from the scheduler event {}", mediaPackageId);
1452       mergedProperties.putAll(properties);
1453     }
1454 
1455     return mergedProperties;
1456   }
1457 
1458   /**
1459    * Merges the ingested mediapackage with the scheduled mediapackage. The ingested mediapackage takes precedence over
1460    * the scheduled mediapackage.
1461    *
1462    * @param mp
1463    *          the ingested mediapackage
1464    * @return the merged mediapackage
1465    */
1466   private MediaPackage mergeScheduledMediaPackage(MediaPackage mp) throws IngestException {
1467     if (schedulerService == null) {
1468       logger.warn("No scheduler service available to merge mediapackage!");
1469       return mp;
1470     }
1471 
1472     try {
1473       MediaPackage scheduledMp = schedulerService.getMediaPackage(mp.getIdentifier().toString());
1474       logger.info("Found matching scheduled event for id '{}', merging mediapackage...", mp.getIdentifier().toString());
1475       mergeMediaPackageElements(mp, scheduledMp);
1476       mergeMediaPackageMetadata(mp, scheduledMp);
1477       return mp;
1478     } catch (NotFoundException e) {
1479       logger.debug("No scheduler mediapackage found with id {}, skip merging", mp.getIdentifier());
1480       return mp;
1481     } catch (Exception e) {
1482       throw new IngestException(String.format("Unable to get event media package from scheduler event %s",
1483               mp.getIdentifier()), e);
1484     }
1485   }
1486 
1487   /**
1488    * Merge different elements from capture agent ingesting mp and Asset manager. Overwrite or replace same flavored
1489    * elements depending on the Ingest Service overwrite configuration. Ignore publications (i.e. live publication
1490    * channel from Asset Manager) Always keep tracks from the capture agent.
1491    *
1492    * @param mp
1493    *          the medipackage being ingested from the Capture Agent
1494    * @param scheduledMp
1495    *          the mediapckage that was schedule and managed by the Asset Manager
1496    */
1497   private void mergeMediaPackageElements(MediaPackage mp, MediaPackage scheduledMp) {
1498     // drop catalogs sent by the capture agent in favor of Opencast's own metadata
1499     if (skipCatalogs) {
1500       for (MediaPackageElement element : mp.getCatalogs()) {
1501         if (!element.getFlavor().equals(MediaPackageElements.SMIL)) {
1502           mp.remove(element);
1503         }
1504       }
1505     }
1506 
1507     // drop attachments the capture agent sent us in favor of Opencast's attachments
1508     // e.g. prevent capture agents from modifying security rules of schedules events
1509     if (skipAttachments) {
1510       for (MediaPackageElement element : mp.getAttachments()) {
1511         mp.remove(element);
1512       }
1513     }
1514 
1515     for (MediaPackageElement element : scheduledMp.getElements()) {
1516       if (MediaPackageElement.Type.Publication.equals(element.getElementType())) {
1517         // The Asset managed media package may have a publication element for a live event, if retract live has not
1518         // run yet.
1519         // Publications do not have flavors and are never part of the mediapackage from the capture agent.
1520         // Therefore, ignore publication element because it is removed when the recorded media is published and causes
1521         // complications (on short media) if added.
1522         logger.debug("Ignoring {}, not adding to ingested mediapackage {}", MediaPackageElement.Type.Publication, mp);
1523         continue;
1524       } else if (mp.getElementsByFlavor(element.getFlavor()).length > 0) {
1525         // The default is to overwrite matching flavored elements in the Asset managed mediapackage (e.g. catalogs)
1526         // If isOverwrite is true, changes made from the CA overwrite (update/revert) changes made from the Admin UI.
1527         // If isOverwrite is false, changes made from the CA do not overwrite (update/revert) changes made from the
1528         // Admin UI.
1529         // regardless of overwrite, always keep new ingested tracks.
1530         if (!isAddOnlyNew || MediaPackageElement.Type.Track.equals(element.getElementType())) {
1531           // Allow updates made from the Capture Agent to overwrite existing metadata in Opencast
1532           logger.info(
1533                   "Omitting Opencast (Asset Managed) element '{}', replacing with ingested element of same flavor '{}'",
1534                   element,
1535                   element.getFlavor());
1536           continue;
1537         }
1538         // Remove flavored element from ingested mp and replaced it with maching element from Asset Managed
1539         // mediapackage.
1540         // This protects updates made from the admin UI during an event capture from being reverted by artifacts from
1541         // the ingested CA.
1542         for (MediaPackageElement el : mp.getElementsByFlavor(element.getFlavor())) {
1543           logger.info("Omitting ingested element '{}' {}, keeping existing (Asset Managed) element of same flavor '{}'",
1544               el, el.getURI(), element.getFlavor());
1545           mp.remove(el);
1546         }
1547       }
1548       logger.info("Adding element {} from scheduled (Asset Managed) event '{}' into ingested mediapackage",
1549           element, mp);
1550       mp.add(element);
1551     }
1552   }
1553 
1554   /**
1555    *
1556    * The previous OC behaviour is for metadata in the ingested mediapackage to be updated by the
1557    * Asset Managed metadata *only* when the field is blank on the ingested mediapackage.
1558    * However, that field may have been intentionally emptied by
1559    * removing its value from the Capture Agent UI (e.g. Galicaster)
1560    *
1561    * If isOverwrite is true, metadata values in the ingest mediapackage overwrite Asset Managed metadata.
1562    * If isOverwrite is false, Asset Managed metadata is preserved.
1563    *
1564    * @param mp,
1565    *          the inbound ingested mp
1566    * @param scheduledMp,
1567    *          the existing scheduled mp
1568    */
1569   private void mergeMediaPackageMetadata(MediaPackage mp, MediaPackage scheduledMp) {
1570     // Merge media package fields depending on overwrite setting
1571     boolean noOverwrite = (isAddOnlyNew && !skipCatalogs) || skipCatalogs;
1572     if ((mp.getDate() == null) || noOverwrite) {
1573       mp.setDate(scheduledMp.getDate());
1574     }
1575     if (isBlank(mp.getLicense()) || noOverwrite) {
1576       mp.setLicense(scheduledMp.getLicense());
1577     }
1578     if (isBlank(mp.getSeries()) || noOverwrite) {
1579       mp.setSeries(scheduledMp.getSeries());
1580     }
1581     if (isBlank(mp.getSeriesTitle()) || noOverwrite) {
1582       mp.setSeriesTitle(scheduledMp.getSeriesTitle());
1583     }
1584     if (isBlank(mp.getTitle()) || noOverwrite) {
1585       mp.setTitle(scheduledMp.getTitle());
1586     }
1587 
1588     if (mp.getSubjects().length <= 0 || noOverwrite) {
1589       Arrays.stream(mp.getSubjects()).forEach(mp::removeSubject);
1590       for (String subject : scheduledMp.getSubjects()) {
1591         mp.addSubject(subject);
1592       }
1593     }
1594     if (noOverwrite || mp.getContributors().length == 0) {
1595       Arrays.stream(mp.getContributors()).forEach(mp::removeContributor);
1596       for (String contributor : scheduledMp.getContributors()) {
1597         mp.addContributor(contributor);
1598       }
1599     }
1600     if (noOverwrite || mp.getCreators().length == 0) {
1601       Arrays.stream(mp.getCreators()).forEach(mp::removeCreator);
1602       for (String creator : scheduledMp.getCreators()) {
1603         mp.addCreator(creator);
1604       }
1605     }
1606   }
1607 
1608   /**
1609    * Removes the workflow configuration file prefix from all properties in a map.
1610    *
1611    * @param properties
1612    *          The properties to remove the prefixes from
1613    * @return A Map with the same collection of properties without the prefix
1614    */
1615   private Map<String, String> removePrefixFromProperties(Map<String, String> properties) {
1616     Map<String, String> fixedProperties = new HashMap<>();
1617     if (properties != null) {
1618       for (Entry<String, String> entry : properties.entrySet()) {
1619         if (entry.getKey().startsWith(WORKFLOW_CONFIGURATION_PREFIX)) {
1620           logger.debug("Removing prefix from key '" + entry.getKey() + " with value '" + entry.getValue() + "'");
1621           fixedProperties.put(entry.getKey().replace(WORKFLOW_CONFIGURATION_PREFIX, ""), entry.getValue());
1622         } else {
1623           fixedProperties.put(entry.getKey(), entry.getValue());
1624         }
1625       }
1626     }
1627     return fixedProperties;
1628   }
1629 
1630   private WorkflowDefinition getWorkflowDefinition(String workflowDefinitionID, MediaPackage mediapackage)
1631           throws NotFoundException, WorkflowDatabaseException, IngestException {
1632     // If the workflow definition and instance ID are null, use the default, or throw if there is none
1633     if (isBlank(workflowDefinitionID)) {
1634       String mediaPackageId = mediapackage.getIdentifier().toString();
1635       if (schedulerService != null) {
1636         logger.info("Determining workflow template for ingested mediapckage {} from capture event {}", mediapackage,
1637                 mediaPackageId);
1638         try {
1639           Map<String, String> recordingProperties = schedulerService.getCaptureAgentConfiguration(mediaPackageId);
1640           workflowDefinitionID = recordingProperties.get(CaptureParameters.INGEST_WORKFLOW_DEFINITION);
1641           if (isBlank(workflowDefinitionID)) {
1642             workflowDefinitionID = defaultWorkflowDefinionId;
1643             logger.debug("No workflow set. Falling back to default.");
1644           }
1645           if (isBlank(workflowDefinitionID)) {
1646             throw new IngestException("No value found for key '" + CaptureParameters.INGEST_WORKFLOW_DEFINITION
1647                     + "' from capture event configuration of scheduler event '" + mediaPackageId + "'");
1648           }
1649           logger.info("Ingested event {} will be processed using workflow '{}'", mediapackage, workflowDefinitionID);
1650         } catch (NotFoundException e) {
1651           logger.warn("Specified capture event {} was not found", mediaPackageId);
1652         } catch (UnauthorizedException e) {
1653           throw new IllegalStateException(e);
1654         } catch (SchedulerException e) {
1655           logger.warn("Unable to get the workflow definition id from scheduler event {}", mediaPackageId, e);
1656           throw new IngestException(e);
1657         }
1658       } else {
1659         logger.warn(
1660                 "Scheduler service not bound, unable to determine the workflow template to use for ingested "
1661                     + "mediapackage {}", mediapackage);
1662       }
1663 
1664     } else {
1665       logger.info("Ingested mediapackage {} is processed using workflow template '{}', specified during ingest",
1666               mediapackage, workflowDefinitionID);
1667     }
1668 
1669     // Use the default workflow definition if nothing was determined
1670     if (isBlank(workflowDefinitionID) && defaultWorkflowDefinionId != null) {
1671       logger.info("Using default workflow definition '{}' to process ingested mediapackage {}",
1672               defaultWorkflowDefinionId, mediapackage);
1673       workflowDefinitionID = defaultWorkflowDefinionId;
1674     }
1675 
1676     // Check if the workflow definition is valid
1677     if (StringUtils.isNotBlank(workflowDefinitionID) && StringUtils.isNotBlank(defaultWorkflowDefinionId)) {
1678       try {
1679         workflowService.getWorkflowDefinitionById(workflowDefinitionID);
1680       } catch (WorkflowDatabaseException e) {
1681         throw new IngestException(e);
1682       } catch (NotFoundException nfe) {
1683         logger.warn("Workflow definition {} not found, using default workflow {} instead", workflowDefinitionID,
1684                 defaultWorkflowDefinionId);
1685         workflowDefinitionID = defaultWorkflowDefinionId;
1686       }
1687     }
1688 
1689     // Have we been able to find a workflow definition id?
1690     if (isBlank(workflowDefinitionID)) {
1691       ingestStatistics.failed();
1692       throw new IllegalStateException("Can not ingest a workflow without a workflow definition or an existing "
1693           + "instance. No default definition is specified");
1694     }
1695 
1696     // Let's make sure the workflow definition exists
1697     return workflowService.getWorkflowDefinitionById(workflowDefinitionID);
1698   }
1699 
1700   /**
1701    *
1702    * {@inheritDoc}
1703    *
1704    * @see org.opencastproject.ingest.api.IngestService#discardMediaPackage(
1705    *      org.opencastproject.mediapackage.MediaPackage)
1706    */
1707   @Override
1708   public void discardMediaPackage(MediaPackage mp) throws IOException {
1709     String mediaPackageId = mp.getIdentifier().toString();
1710     for (MediaPackageElement element : mp.getElements()) {
1711       if (!workingFileRepository.delete(mediaPackageId, element.getIdentifier())) {
1712         logger.warn("Unable to find (and hence, delete), this mediapackage element");
1713       }
1714     }
1715     logger.info("Successfully discarded media package {}", mp);
1716   }
1717 
1718   protected URI addContentToRepo(MediaPackage mp, String elementId, URI uri) throws IOException {
1719     InputStream in = null;
1720     HttpResponse response = null;
1721     CloseableHttpClient externalHttpClient = null;
1722     try {
1723       if (uri.toString().startsWith("http")) {
1724         HttpGet get = new HttpGet(uri);
1725 
1726         if (!isBlank(downloadSource) && uri.toString().matches(downloadSource)) {
1727           // NB: We're creating a new client here with *different* auth than the system auth creds
1728           externalHttpClient = getAuthedHttpClient();
1729           get.setHeader("X-Requested-Auth", downloadAuthMethod);
1730           if ("Basic".equals(downloadAuthMethod) && downloadAuthForceBasic) {
1731             String auth = downloadUser + ":" + downloadPassword;
1732             byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes(StandardCharsets.ISO_8859_1));
1733             String authHeader = "Basic " + new String(encodedAuth);
1734             get.setHeader(HttpHeaders.AUTHORIZATION, authHeader);
1735           }
1736           response = externalHttpClient.execute(get);
1737         } else {
1738           // httpClient checks internally to see if it should be sending the default auth, or not.
1739           response = httpClient.execute(get);
1740         }
1741 
1742         if (null == response) {
1743           // If you get here then chances are you're using a mock httpClient which does not have appropriate
1744           // mocking to respond to the URL you are feeding it.  Try adding that URL to the mock and see if that works.
1745           throw new IOException("Null response object from the http client, refer to code for explanation");
1746         }
1747 
1748         int httpStatusCode = response.getStatusLine().getStatusCode();
1749         if (httpStatusCode != 200) {
1750           throw new IOException(uri + " returns http " + httpStatusCode);
1751         }
1752         in = response.getEntity().getContent();
1753         //If it does not start with file, or we're in test mode (ie, to allow arbitrary file:// access)
1754       } else if (!uri.toString().startsWith("file") || testMode) {
1755         in = uri.toURL().openStream();
1756       } else {
1757         throw new IOException("Refusing to fetch files from the local filesystem");
1758       }
1759       String fileName = FilenameUtils.getName(uri.getPath());
1760       if (isBlank(FilenameUtils.getExtension(fileName))) {
1761         fileName = getContentDispositionFileName(response);
1762       }
1763 
1764       if (isBlank(FilenameUtils.getExtension(fileName))) {
1765         throw new IOException("No filename extension found: " + fileName);
1766       }
1767       return addContentToRepo(mp, elementId, fileName, in);
1768     } finally {
1769       if (in != null) {
1770         in.close();
1771       }
1772       if (externalHttpClient != null) {
1773         externalHttpClient.close();
1774       }
1775       httpClient.close(response);
1776     }
1777   }
1778 
1779   private String getContentDispositionFileName(HttpResponse response) {
1780     if (response == null) {
1781       return null;
1782     }
1783 
1784     Header header = response.getFirstHeader("Content-Disposition");
1785     ContentDisposition contentDisposition = new ContentDisposition(header.getValue());
1786     return contentDisposition.getParameter("filename");
1787   }
1788 
1789   private URI addContentToRepo(MediaPackage mp, String elementId, String filename, InputStream file)
1790           throws IOException {
1791     ProgressInputStream progressInputStream = new ProgressInputStream(file);
1792     progressInputStream.addPropertyChangeListener(new PropertyChangeListener() {
1793       @Override
1794       public void propertyChange(PropertyChangeEvent evt) {
1795         long totalNumBytesRead = (Long) evt.getNewValue();
1796         long oldTotalNumBytesRead = (Long) evt.getOldValue();
1797         ingestStatistics.add(totalNumBytesRead - oldTotalNumBytesRead);
1798       }
1799     });
1800     return workingFileRepository.put(mp.getIdentifier().toString(), elementId, filename, progressInputStream);
1801   }
1802 
1803   private MediaPackage addContentToMediaPackage(MediaPackage mp, String elementId, URI uri,
1804           MediaPackageElement.Type type, MediaPackageElementFlavor flavor) {
1805     logger.info("Adding element of type {} to mediapackage {}", type, mp);
1806     MediaPackageElement mpe = mp.add(uri, type, flavor);
1807     mpe.setIdentifier(elementId);
1808     return mp;
1809   }
1810 
1811   // ---------------------------------------------
1812   // --------- bind and unbind bundles ---------
1813   // ---------------------------------------------
1814   @Reference
1815   public void setWorkflowService(WorkflowService workflowService) {
1816     this.workflowService = workflowService;
1817   }
1818 
1819   @Reference
1820   public void setWorkingFileRepository(WorkingFileRepository workingFileRepository) {
1821     this.workingFileRepository = workingFileRepository;
1822   }
1823 
1824   @Reference
1825   public void setSeriesService(SeriesService seriesService) {
1826     this.seriesService = seriesService;
1827   }
1828 
1829   @Reference
1830   public void setDublinCoreService(DublinCoreCatalogService dublinCoreService) {
1831     this.dublinCoreService = dublinCoreService;
1832   }
1833 
1834   /**
1835    * {@inheritDoc}
1836    *
1837    * @see org.opencastproject.job.api.AbstractJobProducer#getServiceRegistry()
1838    */
1839   @Override
1840   protected ServiceRegistry getServiceRegistry() {
1841     return serviceRegistry;
1842   }
1843 
1844   /**
1845    * {@inheritDoc}
1846    *
1847    * @see org.opencastproject.job.api.AbstractJobProducer#process(org.opencastproject.job.api.Job)
1848    */
1849   @Override
1850   protected String process(Job job) throws Exception {
1851     throw new IllegalStateException("Ingest jobs are not expected to be dispatched");
1852   }
1853 
1854   /**
1855    * Callback for setting the security service.
1856    *
1857    * @param securityService
1858    *          the securityService to set
1859    */
1860   @Reference
1861   public void setSecurityService(SecurityService securityService) {
1862     this.securityService = securityService;
1863   }
1864 
1865   /**
1866    * Callback for setting the user directory service.
1867    *
1868    * @param userDirectoryService
1869    *          the userDirectoryService to set
1870    */
1871   @Reference
1872   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1873     this.userDirectoryService = userDirectoryService;
1874   }
1875 
1876   /**
1877    * Callback for setting the scheduler service.
1878    *
1879    * @param schedulerService
1880    *          the scheduler service to set
1881    */
1882   @Reference(
1883       policy = ReferencePolicy.DYNAMIC,
1884       cardinality = ReferenceCardinality.OPTIONAL,
1885       unbind = "unsetSchedulerService"
1886   )
1887   public void setSchedulerService(SchedulerService schedulerService) {
1888     this.schedulerService = schedulerService;
1889   }
1890 
1891   public void unsetSchedulerService(SchedulerService schedulerService) {
1892     if (this.schedulerService == schedulerService) {
1893       this.schedulerService = null;
1894     }
1895   }
1896 
1897   /**
1898    * Sets a reference to the organization directory service.
1899    *
1900    * @param organizationDirectory
1901    *          the organization directory
1902    */
1903   @Reference
1904   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
1905     organizationDirectoryService = organizationDirectory;
1906   }
1907 
1908   /**
1909    * {@inheritDoc}
1910    *
1911    * @see org.opencastproject.job.api.AbstractJobProducer#getSecurityService()
1912    */
1913   @Override
1914   protected SecurityService getSecurityService() {
1915     return securityService;
1916   }
1917 
1918   /**
1919    * {@inheritDoc}
1920    *
1921    * @see org.opencastproject.job.api.AbstractJobProducer#getUserDirectoryService()
1922    */
1923   @Override
1924   protected UserDirectoryService getUserDirectoryService() {
1925     return userDirectoryService;
1926   }
1927 
1928   /**
1929    * {@inheritDoc}
1930    *
1931    * @see org.opencastproject.job.api.AbstractJobProducer#getOrganizationDirectoryService()
1932    */
1933   @Override
1934   protected OrganizationDirectoryService getOrganizationDirectoryService() {
1935     return organizationDirectoryService;
1936   }
1937 
1938   protected CloseableHttpClient getAuthedHttpClient() {
1939     HttpClientBuilder cb = HttpClientBuilder.create();
1940     CredentialsProvider provider = new BasicCredentialsProvider();
1941     String schema = AuthSchemes.DIGEST;
1942     if ("Basic".equals(downloadAuthMethod)) {
1943       schema = AuthSchemes.BASIC;
1944     }
1945     provider.setCredentials(
1946         new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, AuthScope.ANY_REALM, schema),
1947         new UsernamePasswordCredentials(downloadUser, downloadPassword));
1948     return cb.setDefaultCredentialsProvider(provider).build();
1949   }
1950 
1951   private MediaPackage createSmil(MediaPackage mediaPackage) throws IOException, IngestException {
1952     List<Track> partialTracks = new ArrayList<>();
1953     for (Track track : mediaPackage.getTracks()) {
1954       Long startTime = partialTrackStartTimes.getIfPresent(track.getIdentifier());
1955       if (startTime != null) {
1956         partialTracks.add(track);
1957       }
1958     }
1959 
1960     // No partial track available return without adding SMIL catalog
1961     if (partialTracks.isEmpty()) {
1962       return mediaPackage;
1963     }
1964 
1965     // Inspect the partial tracks
1966     List<Track> tracks = partialTracks.stream()
1967         .map(track -> {
1968           try {
1969             // Create a media inspection job for a mediapackage element.
1970             return mediaInspectionService.enrich(track, true);
1971           } catch (Exception e) {
1972             throw new RuntimeException("Error enriching track", e);
1973           }
1974         })
1975         .map(job -> {
1976           try {
1977             // Interpret the payload of a completed Job as a MediaPackageElement.
1978             // Wait for the job to complete if necessary
1979             waitForJob(getServiceRegistry(), Optional.empty(), job);
1980             return (Track) MediaPackageElementParser.getFromXml(job.getPayload());
1981           } catch (Exception e) {
1982             throw new RuntimeException("Error parsing job payload as track", e);
1983           }
1984         })
1985         .peek(MediaPackageSupport.updateElement(mediaPackage))
1986         .collect(Collectors.toList());
1987 
1988     // Create the SMIL document
1989     org.w3c.dom.Document smilDocument = SmilUtil.createSmil();
1990     for (Track track : tracks) {
1991       Long startTime = partialTrackStartTimes.getIfPresent(track.getIdentifier());
1992       if (startTime == null) {
1993         logger.error("No start time found for track {}", track);
1994         throw new IngestException("No start time found for track " + track.getIdentifier());
1995       }
1996       smilDocument = addSmilTrack(smilDocument, track, startTime);
1997       partialTrackStartTimes.invalidate(track.getIdentifier());
1998     }
1999 
2000     // Store the SMIL document in the mediapackage
2001     return addSmilCatalog(smilDocument, mediaPackage);
2002   }
2003 
2004   /**
2005    * Adds a SMIL catalog to a mediapackage if it's not already existing.
2006    *
2007    * @param smilDocument
2008    *          the smil document
2009    * @param mediaPackage
2010    *          the mediapackage to extend with the SMIL catalog
2011    * @return the augmented mediapcakge
2012    * @throws IOException
2013    *           if reading or writing of the SMIL catalog fails
2014    * @throws IngestException
2015    *           if the SMIL catalog already exists
2016    */
2017   private MediaPackage addSmilCatalog(org.w3c.dom.Document smilDocument, MediaPackage mediaPackage)
2018           throws IOException, IngestException {
2019     Optional<org.w3c.dom.Document> optSmilDocument = loadSmilDocument(workingFileRepository, mediaPackage);
2020     if (optSmilDocument.isPresent()) {
2021       throw new IngestException("SMIL already exists!");
2022     }
2023 
2024     InputStream in = null;
2025     try {
2026       in = XmlUtil.serializeDocument(smilDocument);
2027       String elementId = UUID.randomUUID().toString();
2028       URI uri = workingFileRepository.put(mediaPackage.getIdentifier().toString(), elementId, PARTIAL_SMIL_NAME, in);
2029       MediaPackageElement mpe = mediaPackage.add(uri, MediaPackageElement.Type.Catalog, MediaPackageElements.SMIL);
2030       mpe.setIdentifier(elementId);
2031       // Reset the checksum since it changed
2032       mpe.setChecksum(null);
2033       mpe.setMimeType(MimeTypes.SMIL);
2034       return mediaPackage;
2035     } finally {
2036       IoSupport.closeQuietly(in);
2037     }
2038   }
2039 
2040   /**
2041    * Load a SMIL document of a media package.
2042    *
2043    * @return the document or none if no media package element found.
2044    */
2045   private Optional<org.w3c.dom.Document> loadSmilDocument(final WorkingFileRepository workingFileRepository,
2046           MediaPackage mp) {
2047     return Arrays.stream(mp.getElements())
2048         .filter(MediaPackageSupport.Filters::isSmilCatalog)
2049         .findFirst()
2050         .map(mpe -> {
2051           try (InputStream in = workingFileRepository.get(
2052               mpe.getMediaPackage().getIdentifier().toString(),
2053               mpe.getIdentifier())) {
2054             return SmilUtil.loadSmilDocument(in, mpe);
2055           } catch (Exception e) {
2056             logger.warn("Unable to load smil document from catalog '{}'", mpe, e);
2057             return Misc.chuck(e);
2058           }
2059         });
2060   }
2061 
2062   /**
2063    * Adds a SMIL track by a mediapackage track to a SMIL document
2064    *
2065    * @param smilDocument
2066    *          the SMIL document to extend
2067    * @param track
2068    *          the mediapackage track
2069    * @param startTime
2070    *          the start time
2071    * @return the augmented SMIL document
2072    * @throws IngestException
2073    *           if the partial flavor type is not valid
2074    */
2075   private org.w3c.dom.Document addSmilTrack(org.w3c.dom.Document smilDocument, Track track, long startTime)
2076           throws IngestException {
2077     if (MediaPackageElements.PRESENTER_SOURCE.getType().equals(track.getFlavor().getType())) {
2078       return SmilUtil.addTrack(smilDocument, SmilUtil.TrackType.PRESENTER, track.hasVideo(), startTime,
2079               track.getDuration(), track.getURI(), track.getIdentifier());
2080     } else if (MediaPackageElements.PRESENTATION_SOURCE.getType().equals(track.getFlavor().getType())) {
2081       return SmilUtil.addTrack(smilDocument, SmilUtil.TrackType.PRESENTATION, track.hasVideo(), startTime,
2082               track.getDuration(), track.getURI(), track.getIdentifier());
2083     } else {
2084       logger.warn("Invalid partial flavor type {} of track {}", track.getFlavor(), track);
2085       throw new IngestException(
2086               "Invalid partial flavor type " + track.getFlavor().getType() + " of track " + track.getURI().toString());
2087     }
2088   }
2089 
2090   private MediaPackage checkForCASeries(MediaPackage mp, String seriesAppendName) {
2091     //Check for media package id and CA series appendix set
2092     if (mp == null || seriesAppendName == null) {
2093       logger.debug("No series name provided");
2094       return mp;
2095     }
2096 
2097     // Verify user is a CA by checking roles and captureAgentId
2098     User user = securityService.getUser();
2099     if (!user.hasRole(GLOBAL_ADMIN_ROLE) && !user.hasRole(GLOBAL_CAPTURE_AGENT_ROLE)) {
2100       logger.info("User '{}' is missing capture agent roles, won't apply CASeries", user.getUsername());
2101       return mp;
2102     }
2103     //Get capture agent name
2104     String captureAgentId = null;
2105     Catalog[] catalog = mp.getCatalogs(MediaPackageElementFlavor.flavor("dublincore", "episode"));
2106     if (catalog.length == 1) {
2107       try (InputStream catalogInputStream = workingFileRepository.get(mp.getIdentifier().toString(),
2108           catalog[0].getIdentifier())) {
2109         DublinCoreCatalog dc = dublinCoreService.load(catalogInputStream);
2110         captureAgentId = getCaptureAgent(dc);
2111       } catch (Exception e) {
2112         logger.info("Unable to determine capture agent name");
2113       }
2114     }
2115     if (captureAgentId == null) {
2116       logger.info("No Capture Agent ID defined for MediaPackage {}, won't apply CASeries", mp.getIdentifier());
2117       return mp;
2118     }
2119     logger.info("Applying CASeries to MediaPackage {} for capture agent '{}'", mp.getIdentifier(), captureAgentId);
2120 
2121     // Find or create CA series
2122     String seriesId = captureAgentId.replaceAll("[^\\w-_.:;()]+", "_");
2123     String seriesName = captureAgentId + seriesAppendName;
2124 
2125     try {
2126       seriesService.getSeries(seriesId);
2127     } catch (NotFoundException nfe) {
2128       try {
2129         List<String> roleNames = new ArrayList<>();
2130         String roleName = SecurityUtil.getCaptureAgentRole(captureAgentId);
2131         roleNames.add(roleName);
2132         logger.debug("Capture agent role name: {}", roleName);
2133 
2134         String username = user.getUsername();
2135         roleNames.add(UserIdRoleProvider.getUserIdRole(username));
2136 
2137         logger.info("Creating new series for capture agent '{}' and user '{}'", captureAgentId, username);
2138         createSeries(seriesId, seriesName, roleNames);
2139       } catch (Exception e) {
2140         logger.error("Unable to create series {} for event {}", seriesName, mp, e);
2141         return mp;
2142       }
2143     } catch (SeriesException | UnauthorizedException e) {
2144       logger.error("Exception while searching for series {}", seriesName, e);
2145       return mp;
2146     }
2147 
2148     // Add the event to CA series
2149     mp.setSeries(seriesId);
2150     mp.setSeriesTitle(seriesName);
2151 
2152     return mp;
2153   }
2154 
2155   private DublinCoreCatalog createSeries(String seriesId, String seriesName, List<String> roleNames)
2156           throws SeriesException, UnauthorizedException, NotFoundException {
2157     DublinCoreCatalog dc = DublinCores.mkOpencastSeries().getCatalog();
2158     dc.set(PROPERTY_IDENTIFIER, seriesId);
2159     dc.set(PROPERTY_TITLE, seriesName);
2160     dc.set(DublinCore.PROPERTY_CREATED, EncodingSchemeUtils.encodeDate(new Date(), Precision.Second));
2161     // set series name
2162     DublinCoreCatalog createdSeries = seriesService.updateSeries(dc);
2163 
2164     // fill acl
2165     List<AccessControlEntry> aces = new ArrayList();
2166     for (String roleName : roleNames) {
2167       AccessControlEntry aceRead = new AccessControlEntry(roleName, Permissions.Action.READ.toString(), true);
2168       AccessControlEntry aceWrite = new AccessControlEntry(roleName, Permissions.Action.WRITE.toString(), true);
2169       aces.add(aceRead);
2170       aces.add(aceWrite);
2171     }
2172     AccessControlList acl = new AccessControlList(aces);
2173     seriesService.updateAccessControl(seriesId, acl);
2174     logger.info("Created capture agent series with name {} and id {}", seriesName, seriesId);
2175 
2176     return dc;
2177   }
2178 }