View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.ingest.impl;
23  
24  import static org.apache.commons.lang3.StringUtils.isBlank;
25  import static org.opencastproject.metadata.dublincore.DublinCore.PROPERTY_IDENTIFIER;
26  import static org.opencastproject.metadata.dublincore.DublinCore.PROPERTY_TITLE;
27  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
28  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_CAPTURE_AGENT_ROLE;
29  import static org.opencastproject.util.JobUtil.waitForJob;
30  import static org.opencastproject.util.data.Monadics.mlist;
31  import static org.opencastproject.util.data.Option.none;
32  
33  import org.opencastproject.authorization.xacml.XACMLParsingException;
34  import org.opencastproject.authorization.xacml.XACMLUtils;
35  import org.opencastproject.capture.CaptureParameters;
36  import org.opencastproject.ingest.api.IngestException;
37  import org.opencastproject.ingest.api.IngestService;
38  import org.opencastproject.ingest.impl.jmx.IngestStatistics;
39  import org.opencastproject.inspection.api.MediaInspectionService;
40  import org.opencastproject.job.api.AbstractJobProducer;
41  import org.opencastproject.job.api.Job;
42  import org.opencastproject.job.api.Job.Status;
43  import org.opencastproject.mediapackage.Attachment;
44  import org.opencastproject.mediapackage.Catalog;
45  import org.opencastproject.mediapackage.EName;
46  import org.opencastproject.mediapackage.MediaPackage;
47  import org.opencastproject.mediapackage.MediaPackageBuilderFactory;
48  import org.opencastproject.mediapackage.MediaPackageElement;
49  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
50  import org.opencastproject.mediapackage.MediaPackageElementParser;
51  import org.opencastproject.mediapackage.MediaPackageElements;
52  import org.opencastproject.mediapackage.MediaPackageException;
53  import org.opencastproject.mediapackage.MediaPackageParser;
54  import org.opencastproject.mediapackage.MediaPackageSupport;
55  import org.opencastproject.mediapackage.Track;
56  import org.opencastproject.mediapackage.identifier.IdImpl;
57  import org.opencastproject.metadata.dublincore.DCMIPeriod;
58  import org.opencastproject.metadata.dublincore.DublinCore;
59  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
60  import org.opencastproject.metadata.dublincore.DublinCoreCatalogService;
61  import org.opencastproject.metadata.dublincore.DublinCoreValue;
62  import org.opencastproject.metadata.dublincore.DublinCores;
63  import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
64  import org.opencastproject.metadata.dublincore.Precision;
65  import org.opencastproject.scheduler.api.SchedulerException;
66  import org.opencastproject.scheduler.api.SchedulerService;
67  import org.opencastproject.security.api.AccessControlEntry;
68  import org.opencastproject.security.api.AccessControlList;
69  import org.opencastproject.security.api.OrganizationDirectoryService;
70  import org.opencastproject.security.api.Permissions;
71  import org.opencastproject.security.api.SecurityService;
72  import org.opencastproject.security.api.TrustedHttpClient;
73  import org.opencastproject.security.api.UnauthorizedException;
74  import org.opencastproject.security.api.User;
75  import org.opencastproject.security.api.UserDirectoryService;
76  import org.opencastproject.security.util.SecurityUtil;
77  import org.opencastproject.series.api.SeriesException;
78  import org.opencastproject.series.api.SeriesService;
79  import org.opencastproject.serviceregistry.api.ServiceRegistry;
80  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
81  import org.opencastproject.smil.api.util.SmilUtil;
82  import org.opencastproject.userdirectory.UserIdRoleProvider;
83  import org.opencastproject.util.ConfigurationException;
84  import org.opencastproject.util.IoSupport;
85  import org.opencastproject.util.LoadUtil;
86  import org.opencastproject.util.MimeTypes;
87  import org.opencastproject.util.NotFoundException;
88  import org.opencastproject.util.ProgressInputStream;
89  import org.opencastproject.util.XmlSafeParser;
90  import org.opencastproject.util.XmlUtil;
91  import org.opencastproject.util.data.Function;
92  import org.opencastproject.util.data.Option;
93  import org.opencastproject.util.data.functions.Misc;
94  import org.opencastproject.util.jmx.JmxUtil;
95  import org.opencastproject.workflow.api.WorkflowDatabaseException;
96  import org.opencastproject.workflow.api.WorkflowDefinition;
97  import org.opencastproject.workflow.api.WorkflowException;
98  import org.opencastproject.workflow.api.WorkflowInstance;
99  import org.opencastproject.workflow.api.WorkflowService;
100 import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
101 
102 import com.google.common.cache.Cache;
103 import com.google.common.cache.CacheBuilder;
104 
105 import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
106 import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
107 import org.apache.commons.io.FilenameUtils;
108 import org.apache.commons.io.IOUtils;
109 import org.apache.commons.lang3.BooleanUtils;
110 import org.apache.commons.lang3.StringUtils;
111 import org.apache.cxf.jaxrs.ext.multipart.ContentDisposition;
112 import org.apache.http.Header;
113 import org.apache.http.HttpHeaders;
114 import org.apache.http.HttpResponse;
115 import org.apache.http.auth.AuthScope;
116 import org.apache.http.auth.UsernamePasswordCredentials;
117 import org.apache.http.client.CredentialsProvider;
118 import org.apache.http.client.config.AuthSchemes;
119 import org.apache.http.client.methods.HttpGet;
120 import org.apache.http.impl.client.BasicCredentialsProvider;
121 import org.apache.http.impl.client.CloseableHttpClient;
122 import org.apache.http.impl.client.HttpClientBuilder;
123 import org.osgi.service.cm.ManagedService;
124 import org.osgi.service.component.ComponentContext;
125 import org.osgi.service.component.annotations.Activate;
126 import org.osgi.service.component.annotations.Component;
127 import org.osgi.service.component.annotations.Deactivate;
128 import org.osgi.service.component.annotations.Reference;
129 import org.osgi.service.component.annotations.ReferenceCardinality;
130 import org.osgi.service.component.annotations.ReferencePolicy;
131 import org.slf4j.Logger;
132 import org.slf4j.LoggerFactory;
133 import org.xml.sax.SAXException;
134 
135 import java.beans.PropertyChangeEvent;
136 import java.beans.PropertyChangeListener;
137 import java.io.BufferedReader;
138 import java.io.IOException;
139 import java.io.InputStream;
140 import java.io.InputStreamReader;
141 import java.net.URI;
142 import java.nio.charset.StandardCharsets;
143 import java.util.ArrayList;
144 import java.util.Arrays;
145 import java.util.Base64;
146 import java.util.Date;
147 import java.util.Dictionary;
148 import java.util.HashMap;
149 import java.util.HashSet;
150 import java.util.List;
151 import java.util.Map;
152 import java.util.Map.Entry;
153 import java.util.Objects;
154 import java.util.Optional;
155 import java.util.Set;
156 import java.util.UUID;
157 import java.util.concurrent.TimeUnit;
158 import java.util.stream.Collectors;
159 
160 import javax.management.ObjectInstance;
161 
162 /**
163  * Creates and augments Opencast MediaPackages. Stores media into the Working File Repository.
164  */
165 @Component(
166   immediate = true,
167   service = {
168     IngestService.class,
169     ManagedService.class
170   },
171   property = {
172     "service.description=Ingest Service",
173     "service.pid=org.opencastproject.ingest.impl.IngestServiceImpl"
174   }
175 )
176 public class IngestServiceImpl extends AbstractJobProducer implements IngestService, ManagedService {
177 
178   /** The logger */
179   private static final Logger logger = LoggerFactory.getLogger(IngestServiceImpl.class);
180 
181   /** The source SMIL name */
182   private static final String PARTIAL_SMIL_NAME = "source_partial.smil";
183 
184   /** The configuration key that defines the default workflow definition */
185   protected static final String WORKFLOW_DEFINITION_DEFAULT = "org.opencastproject.workflow.default.definition";
186 
187   /** The workflow configuration property prefix **/
188   protected static final String WORKFLOW_CONFIGURATION_PREFIX = "org.opencastproject.workflow.config.";
189 
190   /** The key for the legacy mediapackage identifier */
191   public static final String LEGACY_MEDIAPACKAGE_ID_KEY = "org.opencastproject.ingest.legacy.mediapackage.id";
192 
193   public static final String JOB_TYPE = "org.opencastproject.ingest";
194 
195   /** Methods that ingest zips create jobs with this operation type */
196   public static final String INGEST_ZIP = "zip";
197 
198   /** Methods that ingest tracks directly create jobs with this operation type */
199   public static final String INGEST_TRACK = "track";
200 
201   /** Methods that ingest tracks from a URI create jobs with this operation type */
202   public static final String INGEST_TRACK_FROM_URI = "uri-track";
203 
204   /** Methods that ingest attachments directly create jobs with this operation type */
205   public static final String INGEST_ATTACHMENT = "attachment";
206 
207   /** Methods that ingest attachments from a URI create jobs with this operation type */
208   public static final String INGEST_ATTACHMENT_FROM_URI = "uri-attachment";
209 
210   /** Methods that ingest catalogs directly create jobs with this operation type */
211   public static final String INGEST_CATALOG = "catalog";
212 
213   /** Methods that ingest catalogs from a URI create jobs with this operation type */
214   public static final String INGEST_CATALOG_FROM_URI = "uri-catalog";
215 
216   /** The approximate load placed on the system by ingesting a file */
217   public static final float DEFAULT_INGEST_FILE_JOB_LOAD = 0.2f;
218 
219   /** The approximate load placed on the system by ingesting a zip file */
220   public static final float DEFAULT_INGEST_ZIP_JOB_LOAD = 0.2f;
221 
222   /** The key to look for in the service configuration file to override the {@link DEFAULT_INGEST_FILE_JOB_LOAD} */
223   public static final String FILE_JOB_LOAD_KEY = "job.load.ingest.file";
224 
225   /** The key to look for in the service configuration file to override the {@link DEFAULT_INGEST_ZIP_JOB_LOAD} */
226   public static final String ZIP_JOB_LOAD_KEY = "job.load.ingest.zip";
227 
228   /** The source to download from  */
229   public static final String DOWNLOAD_SOURCE = "org.opencastproject.download.source";
230 
231   /** The user for download from external sources */
232   public static final String DOWNLOAD_USER = "org.opencastproject.download.user";
233 
234   /** The password for download from external sources */
235   public static final String DOWNLOAD_PASSWORD = "org.opencastproject.download.password";
236 
237   /** The authentication method for download from external sources */
238   public static final String DOWNLOAD_AUTH_METHOD = "org.opencastproject.download.auth.method";
239 
240   /** Force basic authentication even if download host does not ask for it */
241   public static final String DOWNLOAD_AUTH_FORCE_BASIC = "org.opencastproject.download.auth.force_basic";
242 
243   /** By default, do not allow event ingest to modify existing series metadata */
244   public static final boolean DEFAULT_ALLOW_SERIES_MODIFICATIONS = false;
245 
246   /** The default is to preserve existing Opencast flavors during ingest. */
247   public static final boolean DEFAULT_ALLOW_ONLY_NEW_FLAVORS = true;
248 
249   /** The default is not to automatically skip attachments and catalogs from capture agent */
250   public static final boolean DEFAULT_SKIP = false;
251 
252   /** The default for force basic authentication even if download host does not ask for it */
253   public static final boolean DEFAULT_DOWNLOAD_AUTH_FORCE_BASIC = false;
254 
255   /** The maximum length of filenames ingested by Opencast */
256   public static final int FILENAME_LENGTH_MAX = 75;
257 
258   /** Managed Property key to allow Opencast series modification during ingest
259    * Deprecated, the param potentially causes an update chain reaction for all
260    * events associated to that series, for each ingest */
261   @Deprecated
262   public static final String MODIFY_OPENCAST_SERIES_KEY = "org.opencastproject.series.overwrite";
263 
264   /** Managed Property key to allow new flavors of ingested attachments and catalogs
265    * to be added to the existing Opencast mediapackage. But, not catalogs and attachments
266    * that would overwrite existing ones in Opencast.
267    */
268   public static final String ADD_ONLY_NEW_FLAVORS_KEY = "add.only.new.catalogs.attachments.for.existing.events";
269 
270   /** Control if catalogs sent by capture agents for scheduled events are skipped. */
271   public static final String SKIP_CATALOGS_KEY = "skip.catalogs.for.existing.events";
272 
273   /** Control if attachments sent by capture agents for scheduled events are skipped. */
274   public static final String SKIP_ATTACHMENTS_KEY = "skip.attachments.for.existing.events";
275 
276   /** The appendix added to autogenerated CA series. CA series creation deactivated if not configured */
277   private static final String SERIES_APPENDIX = "add.series.to.event.appendix";
278 
279   /** The approximate load placed on the system by ingesting a file */
280   private float ingestFileJobLoad = DEFAULT_INGEST_FILE_JOB_LOAD;
281 
282   /** The approximate load placed on the system by ingesting a zip file */
283   private float ingestZipJobLoad = DEFAULT_INGEST_ZIP_JOB_LOAD;
284 
285   /** The user for download from external sources */
286   private static String downloadUser = DOWNLOAD_USER;
287 
288   /** The password for download from external sources */
289   private static String downloadPassword = DOWNLOAD_PASSWORD;
290 
291   /** The authentication method for download from external sources */
292   private static String downloadAuthMethod = DOWNLOAD_AUTH_METHOD;
293 
294   /** Force basic authentication even if download host does not ask for it */
295   private static boolean downloadAuthForceBasic = DEFAULT_DOWNLOAD_AUTH_FORCE_BASIC;
296 
297   /** The external source dns name */
298   private static String downloadSource = DOWNLOAD_SOURCE;
299 
300   /** The JMX business object for ingest statistics */
301   private IngestStatistics ingestStatistics = new IngestStatistics();
302 
303   /** The JMX bean object instance */
304   private ObjectInstance registerMXBean;
305 
306   /** The workflow service */
307   private WorkflowService workflowService;
308 
309   /** The working file repository */
310   private WorkingFileRepository workingFileRepository;
311 
312   /** The http client */
313   private TrustedHttpClient httpClient;
314 
315   /** The series service */
316   private SeriesService seriesService;
317 
318   /** The dublin core service */
319   private DublinCoreCatalogService dublinCoreService;
320 
321   /** The opencast service registry */
322   private ServiceRegistry serviceRegistry;
323 
324   /** The security service */
325   protected SecurityService securityService = null;
326 
327   /** The user directory service */
328   protected UserDirectoryService userDirectoryService = null;
329 
330   /** The organization directory service */
331   protected OrganizationDirectoryService organizationDirectoryService = null;
332 
333   /** The scheduler service */
334   private SchedulerService schedulerService = null;
335 
336   /** The media inspection service */
337   private MediaInspectionService mediaInspectionService = null;
338 
339   /** The search index. */
340 
341   /** The default workflow identifier, if one is configured */
342   protected String defaultWorkflowDefinionId;
343 
344   /** The partial track start time map */
345   private Cache<String, Long> partialTrackStartTimes = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.DAYS)
346           .build();
347 
348   /** Option to overwrite matching flavors (e.g. series and episode metadata) on ingest,
349    *  tracks are always taken on ingest */
350   protected boolean isAddOnlyNew = DEFAULT_ALLOW_ONLY_NEW_FLAVORS;
351   protected boolean isAllowModifySeries = DEFAULT_ALLOW_SERIES_MODIFICATIONS;
352 
353   private boolean skipCatalogs = DEFAULT_SKIP;
354   private boolean skipAttachments = DEFAULT_SKIP;
355 
356   /** Create name appendix for autogenerated CA series */
357   private String createSeriesAppendix = null;
358 
359   protected boolean testMode = false;
360 
361   /**
362    * Creates a new ingest service instance.
363    */
364   public IngestServiceImpl() {
365     super(JOB_TYPE);
366   }
367 
368   /**
369    * OSGI callback for activating this component
370    *
371    * @param cc
372    *          the osgi component context
373    */
374   @Override
375   @Activate
376   public void activate(ComponentContext cc) {
377     super.activate(cc);
378     logger.info("Ingest Service started.");
379     defaultWorkflowDefinionId = StringUtils.trimToNull(cc.getBundleContext().getProperty(WORKFLOW_DEFINITION_DEFAULT));
380     if (defaultWorkflowDefinionId == null) {
381       defaultWorkflowDefinionId = "schedule-and-upload";
382     }
383     registerMXBean = JmxUtil.registerMXBean(ingestStatistics, "IngestStatistics");
384   }
385 
386   /**
387    * Callback from OSGi on service deactivation.
388    */
389   @Deactivate
390   public void deactivate() {
391     JmxUtil.unregisterMXBean(registerMXBean);
392   }
393 
394   /**
395    * {@inheritDoc}
396    *
397    * @see org.osgi.service.cm.ManagedService#updated(java.util.Dictionary)
398    *      Retrieve ManagedService configuration, including option to overwrite series
399    */
400   @Override
401   public void updated(Dictionary<String, ?> properties) throws ConfigurationException {
402 
403     if (properties == null) {
404       logger.info("No configuration available, using defaults");
405       return;
406     }
407 
408     downloadAuthMethod = StringUtils.trimToEmpty((String)properties.get(DOWNLOAD_AUTH_METHOD));
409     if (!"Digest".equals(downloadAuthMethod) && !"Basic".equals(downloadAuthMethod)) {
410       logger.warn("Download authentication method is neither Digest nor Basic; setting to Digest");
411       downloadAuthMethod = "Digest";
412     }
413     downloadAuthForceBasic = BooleanUtils.toBoolean(Objects.toString(properties.get(DOWNLOAD_AUTH_FORCE_BASIC),
414         BooleanUtils.toStringTrueFalse(DEFAULT_DOWNLOAD_AUTH_FORCE_BASIC)));
415     downloadPassword = StringUtils.trimToEmpty((String)properties.get(DOWNLOAD_PASSWORD));
416     downloadUser = StringUtils.trimToEmpty(((String) properties.get(DOWNLOAD_USER)));
417     downloadSource = StringUtils.trimToEmpty(((String) properties.get(DOWNLOAD_SOURCE)));
418     if (!isBlank(downloadSource) && (isBlank(downloadUser) || isBlank(downloadPassword))) {
419       logger.warn("Configured ingest download source has no configured user or password; deactivating authenticated"
420           + "download");
421       downloadSource = "";
422     }
423 
424     skipAttachments = BooleanUtils.toBoolean(Objects.toString(properties.get(SKIP_ATTACHMENTS_KEY),
425             BooleanUtils.toStringTrueFalse(DEFAULT_SKIP)));
426     skipCatalogs = BooleanUtils.toBoolean(Objects.toString(properties.get(SKIP_CATALOGS_KEY),
427             BooleanUtils.toStringTrueFalse(DEFAULT_SKIP)));
428     logger.debug("Skip attachments sent by agents for scheduled events: {}", skipAttachments);
429     logger.debug("Skip metadata catalogs sent by agents for scheduled events: {}", skipCatalogs);
430 
431     ingestFileJobLoad = LoadUtil.getConfiguredLoadValue(properties, FILE_JOB_LOAD_KEY, DEFAULT_INGEST_FILE_JOB_LOAD,
432             serviceRegistry);
433     ingestZipJobLoad = LoadUtil.getConfiguredLoadValue(properties, ZIP_JOB_LOAD_KEY, DEFAULT_INGEST_ZIP_JOB_LOAD,
434             serviceRegistry);
435 
436     isAllowModifySeries = BooleanUtils.toBoolean(Objects.toString(properties.get(MODIFY_OPENCAST_SERIES_KEY),
437               BooleanUtils.toStringTrueFalse(DEFAULT_ALLOW_SERIES_MODIFICATIONS)));
438     isAddOnlyNew = BooleanUtils.toBoolean(Objects.toString(properties.get(ADD_ONLY_NEW_FLAVORS_KEY),
439             BooleanUtils.toStringTrueFalse(DEFAULT_ALLOW_ONLY_NEW_FLAVORS)));
440     logger.info("Only allow new flavored catalogs and attachments on ingest:'{}'", isAddOnlyNew);
441     logger.info("Allowing series modification:'{}'", isAllowModifySeries);
442     createSeriesAppendix = StringUtils.trimToNull(((String) properties.get(SERIES_APPENDIX)));
443   }
444 
445   /**
446    * Sets the trusted http client
447    *
448    * @param httpClient
449    *          the http client
450    */
451   @Reference
452   public void setHttpClient(TrustedHttpClient httpClient) {
453     this.httpClient = httpClient;
454   }
455 
456   /**
457    * Sets the service registry
458    *
459    * @param serviceRegistry
460    *          the serviceRegistry to set
461    */
462   @Reference
463   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
464     this.serviceRegistry = serviceRegistry;
465   }
466 
467   /**
468    * Sets the media inspection service
469    *
470    * @param mediaInspectionService
471    *          the media inspection service to set
472    */
473   @Reference
474   public void setMediaInspectionService(MediaInspectionService mediaInspectionService) {
475     this.mediaInspectionService = mediaInspectionService;
476   }
477 
478   public WorkflowInstance addZippedMediaPackage(InputStream zipStream)
479           throws IngestException, IOException, MediaPackageException {
480     try {
481       return addZippedMediaPackage(zipStream, null, null);
482     } catch (NotFoundException e) {
483       throw new IllegalStateException("A not found exception was thrown without a lookup");
484     }
485   }
486 
487   @Override
488   public WorkflowInstance addZippedMediaPackage(InputStream zipStream, String wd, Map<String, String> workflowConfig)
489           throws MediaPackageException, IOException, IngestException, NotFoundException {
490     try {
491       return addZippedMediaPackage(zipStream, wd, workflowConfig, null);
492     } catch (UnauthorizedException e) {
493       throw new IllegalStateException(e);
494     }
495   }
496 
497   /**
498    * {@inheritDoc}
499    *
500    * @see org.opencastproject.ingest.api.IngestService#addZippedMediaPackage(java.io.InputStream, java.lang.String,
501    *      java.util.Map, java.lang.Long)
502    */
503   @Override
504   public WorkflowInstance addZippedMediaPackage(InputStream zipStream, String workflowDefinitionId,
505           Map<String, String> workflowConfig, Long workflowInstanceId)
506           throws MediaPackageException, IOException, IngestException, NotFoundException, UnauthorizedException {
507     // Start a job synchronously. We can't keep the open input stream waiting around.
508     Job job = null;
509 
510     if (StringUtils.isNotBlank(workflowDefinitionId)) {
511       try {
512         workflowService.getWorkflowDefinitionById(workflowDefinitionId);
513       } catch (WorkflowDatabaseException e) {
514         throw new IngestException(e);
515       } catch (NotFoundException nfe) {
516         logger.warn("Workflow definition {} not found, using default workflow {} instead", workflowDefinitionId,
517                 defaultWorkflowDefinionId);
518         workflowDefinitionId = defaultWorkflowDefinionId;
519       }
520     }
521 
522     if (workflowInstanceId != null) {
523       logger.warn("Deprecated method! Ingesting zipped mediapackage with workflow {}", workflowInstanceId);
524     } else {
525       logger.info("Ingesting zipped mediapackage");
526     }
527 
528     ZipArchiveInputStream zis = null;
529     Set<String> collectionFilenames = new HashSet<>();
530     try {
531       // We don't need anybody to do the dispatching for us. Therefore we need to make sure that the job is never in
532       // QUEUED state but set it to INSTANTIATED in the beginning and then manually switch it to RUNNING.
533       job = serviceRegistry.createJob(JOB_TYPE, INGEST_ZIP, null, null, false, ingestZipJobLoad);
534       job.setStatus(Status.RUNNING);
535       job = serviceRegistry.updateJob(job);
536 
537       // Create the working file target collection for this ingest operation
538       String wfrCollectionId = Long.toString(job.getId());
539 
540       zis = new ZipArchiveInputStream(zipStream);
541       ZipArchiveEntry entry;
542       MediaPackage mp = null;
543       Map<String, URI> uris = new HashMap<>();
544       // Sequential number to append to file names so that, if two files have the same
545       // name, one does not overwrite the other (see MH-9688)
546       int seq = 1;
547       // Folder name to compare with next one to figure out if there's a root folder
548       String folderName = null;
549       // Indicates if zip has a root folder or not, initialized as true
550       boolean hasRootFolder = true;
551       // While there are entries write them to a collection
552       while ((entry = zis.getNextZipEntry()) != null) {
553         try {
554           if (entry.isDirectory() || entry.getName().contains("__MACOSX"))
555             continue;
556 
557           if (entry.getName().endsWith("manifest.xml") || entry.getName().endsWith("index.xml")) {
558             // Build the media package
559             final InputStream is = new ZipEntryInputStream(zis, entry.getSize());
560             mp = MediaPackageParser.getFromXml(IOUtils.toString(is, StandardCharsets.UTF_8));
561           } else {
562             logger.info("Storing zip entry {}/{} in working file repository collection '{}'", job.getId(),
563                     entry.getName(), wfrCollectionId);
564             // Since the directory structure is not being mirrored, makes sure the file
565             // name is different than the previous one(s) by adding a sequential number
566             String fileName = FilenameUtils.getBaseName(entry.getName()) + "_" + seq++ + "."
567                     + FilenameUtils.getExtension(entry.getName());
568             URI contentUri = workingFileRepository.putInCollection(wfrCollectionId, fileName,
569                     new ZipEntryInputStream(zis, entry.getSize()));
570             collectionFilenames.add(fileName);
571             // Key is the zip entry name as it is
572             String key = entry.getName();
573             uris.put(key, contentUri);
574             ingestStatistics.add(entry.getSize());
575             logger.info("Zip entry {}/{} stored at {}", job.getId(), entry.getName(), contentUri);
576             // Figures out if there's a root folder. Does entry name starts with a folder?
577             int pos = entry.getName().indexOf('/');
578             if (pos == -1) {
579               // No, we can conclude there's no root folder
580               hasRootFolder = false;
581             } else if (hasRootFolder && folderName != null && !folderName.equals(entry.getName().substring(0, pos))) {
582               // Folder name different from previous so there's no root folder
583               hasRootFolder = false;
584             } else if (folderName == null) {
585               // Just initialize folder name
586               folderName = entry.getName().substring(0, pos);
587             }
588           }
589         } catch (IOException e) {
590           logger.warn("Unable to process zip entry {}", entry.getName(), e);
591           throw e;
592         }
593       }
594 
595       if (mp == null)
596         throw new MediaPackageException("No manifest found in this zip");
597 
598       // Determine the mediapackage identifier
599       if (mp.getIdentifier() == null || isBlank(mp.getIdentifier().toString()))
600         mp.setIdentifier(IdImpl.fromUUID());
601 
602       String mediaPackageId = mp.getIdentifier().toString();
603 
604       logger.info("Ingesting mediapackage {} is named '{}'", mediaPackageId, mp.getTitle());
605 
606       // Make sure there are tracks in the mediapackage
607       if (mp.getTracks().length == 0) {
608         logger.warn("Mediapackage {} has no media tracks", mediaPackageId);
609       }
610 
611       // Update the element uris to point to their working file repository location
612       for (MediaPackageElement element : mp.elements()) {
613         // Key has root folder name if there is one
614         URI uri = uris.get((hasRootFolder ? folderName + "/" : "") + element.getURI().toString());
615 
616         if (uri == null)
617           throw new MediaPackageException("Unable to map element name '" + element.getURI() + "' to workspace uri");
618         logger.info("Ingested mediapackage element {}/{} located at {}", mediaPackageId, element.getIdentifier(), uri);
619         URI dest = workingFileRepository.moveTo(wfrCollectionId, FilenameUtils.getName(uri.toString()), mediaPackageId,
620                 element.getIdentifier(), FilenameUtils.getName(element.getURI().toString()));
621         element.setURI(dest);
622       }
623 
624       // Now that all elements are in place, start with ingest
625       logger.info("Initiating processing of ingested mediapackage {}", mediaPackageId);
626       WorkflowInstance workflowInstance = ingest(mp, workflowDefinitionId, workflowConfig, workflowInstanceId);
627       logger.info("Ingest of mediapackage {} done", mediaPackageId);
628       job.setStatus(Job.Status.FINISHED);
629       return workflowInstance;
630     } catch (ServiceRegistryException e) {
631       throw new IngestException(e);
632     } catch (MediaPackageException e) {
633       job.setStatus(Job.Status.FAILED, Job.FailureReason.DATA);
634       throw e;
635     } catch (Exception e) {
636       if (e instanceof IngestException)
637         throw (IngestException) e;
638       throw new IngestException(e);
639     } finally {
640       IOUtils.closeQuietly(zis);
641       finallyUpdateJob(job);
642       for (String filename : collectionFilenames) {
643         workingFileRepository.deleteFromCollection(Long.toString(job.getId()), filename, true);
644       }
645     }
646   }
647 
648   /**
649    * {@inheritDoc}
650    *
651    * @see org.opencastproject.ingest.api.IngestService#createMediaPackage()
652    */
653   @Override
654   public MediaPackage createMediaPackage() throws MediaPackageException, ConfigurationException {
655     MediaPackage mediaPackage;
656     try {
657       mediaPackage = MediaPackageBuilderFactory.newInstance().newMediaPackageBuilder().createNew();
658     } catch (MediaPackageException e) {
659       logger.error("INGEST:Failed to create media package " + e.getLocalizedMessage());
660       throw e;
661     }
662     mediaPackage.setDate(new Date());
663     logger.info("Created mediapackage {}", mediaPackage);
664     return mediaPackage;
665   }
666 
667   /**
668    * {@inheritDoc}
669    *
670    * @see org.opencastproject.ingest.api.IngestService#createMediaPackage()
671    */
672   @Override
673   public MediaPackage createMediaPackage(String mediaPackageId)
674           throws MediaPackageException, ConfigurationException {
675     MediaPackage mediaPackage;
676     try {
677       mediaPackage = MediaPackageBuilderFactory.newInstance().newMediaPackageBuilder()
678               .createNew(new IdImpl(mediaPackageId));
679     } catch (MediaPackageException e) {
680       logger.error("INGEST:Failed to create media package " + e.getLocalizedMessage());
681       throw e;
682     }
683     mediaPackage.setDate(new Date());
684     logger.info("Created mediapackage {}", mediaPackage);
685     return mediaPackage;
686   }
687 
688   /**
689    * {@inheritDoc}
690    *
691    * @see org.opencastproject.ingest.api.IngestService#addTrack(java.net.URI,
692    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, String[] ,
693    *      org.opencastproject.mediapackage.MediaPackage)
694    */
695   @Override
696   public MediaPackage addTrack(URI uri, MediaPackageElementFlavor flavor, String[] tags, MediaPackage mediaPackage)
697           throws IOException, IngestException {
698     Job job = null;
699     try {
700       job = serviceRegistry
701               .createJob(
702                       JOB_TYPE, INGEST_TRACK_FROM_URI, Arrays.asList(uri.toString(),
703                               flavor == null ? null : flavor.toString(), MediaPackageParser.getAsXml(mediaPackage)),
704                       null, false, ingestFileJobLoad);
705       job.setStatus(Status.RUNNING);
706       job = serviceRegistry.updateJob(job);
707       String elementId = UUID.randomUUID().toString();
708       logger.info("Start adding track {} from URL {} on mediapackage {}", elementId, uri, mediaPackage);
709       URI newUrl = addContentToRepo(mediaPackage, elementId, uri);
710       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Track,
711               flavor);
712       if (tags != null && tags.length > 0) {
713         MediaPackageElement trackElement = mp.getTrack(elementId);
714         for (String tag : tags) {
715           logger.info("Adding tag: " + tag + " to Element: " + elementId);
716           trackElement.addTag(tag);
717         }
718       }
719 
720       job.setStatus(Job.Status.FINISHED);
721       logger.info("Successful added track {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
722       return mp;
723     } catch (IOException e) {
724       throw e;
725     } catch (ServiceRegistryException e) {
726       throw new IngestException(e);
727     } catch (NotFoundException e) {
728       throw new IngestException("Unable to update ingest job", e);
729     } finally {
730       finallyUpdateJob(job);
731     }
732   }
733 
734   /**
735    * {@inheritDoc}
736    *
737    * @see org.opencastproject.ingest.api.IngestService#addTrack(java.io.InputStream, java.lang.String,
738    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
739    */
740   @Override
741   public MediaPackage addTrack(InputStream in, String fileName, MediaPackageElementFlavor flavor,
742           MediaPackage mediaPackage) throws IOException, IngestException {
743     String[] tags = null;
744     return this.addTrack(in, fileName, flavor, tags, mediaPackage);
745   }
746 
747   /**
748    * {@inheritDoc}
749    *
750    * @see org.opencastproject.ingest.api.IngestService#addTrack(java.io.InputStream, java.lang.String,
751    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
752    */
753   @Override
754   public MediaPackage addTrack(InputStream in, String fileName, MediaPackageElementFlavor flavor, String[] tags,
755           MediaPackage mediaPackage) throws IOException, IngestException {
756     Job job = null;
757     try {
758       job = serviceRegistry.createJob(JOB_TYPE, INGEST_TRACK, null, null, false, ingestFileJobLoad);
759       job.setStatus(Status.RUNNING);
760       job = serviceRegistry.updateJob(job);
761       String elementId = UUID.randomUUID().toString();
762       logger.info("Start adding track {} from input stream on mediapackage {}", elementId, mediaPackage);
763       if (fileName.length() > FILENAME_LENGTH_MAX) {
764         final String extension = "." + FilenameUtils.getExtension(fileName);
765         final int length = Math.max(0, FILENAME_LENGTH_MAX - extension.length());
766         fileName = fileName.substring(0, length) + extension;
767       }
768       URI newUrl = addContentToRepo(mediaPackage, elementId, fileName, in);
769       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Track,
770               flavor);
771       if (tags != null && tags.length > 0) {
772         MediaPackageElement trackElement = mp.getTrack(elementId);
773         for (String tag : tags) {
774           logger.debug("Adding tag `{}` to element {}", tag, elementId);
775           trackElement.addTag(tag);
776         }
777       }
778 
779       job.setStatus(Job.Status.FINISHED);
780       logger.info("Successful added track {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
781       return mp;
782     } catch (IOException e) {
783       throw e;
784     } catch (ServiceRegistryException e) {
785       throw new IngestException(e);
786     } catch (NotFoundException e) {
787       throw new IngestException("Unable to update ingest job", e);
788     } finally {
789       finallyUpdateJob(job);
790     }
791   }
792 
793   @Override
794   public MediaPackage addPartialTrack(URI uri, MediaPackageElementFlavor flavor, long startTime,
795           MediaPackage mediaPackage) throws IOException, IngestException {
796     Job job = null;
797     try {
798       job = serviceRegistry.createJob(
799               JOB_TYPE,
800               INGEST_TRACK_FROM_URI,
801               Arrays.asList(uri.toString(), flavor == null ? null : flavor.toString(),
802                       MediaPackageParser.getAsXml(mediaPackage)), null, false);
803       job.setStatus(Status.RUNNING);
804       job = serviceRegistry.updateJob(job);
805       String elementId = UUID.randomUUID().toString();
806       logger.info("Start adding partial track {} from URL {} on mediapackage {}", elementId, uri, mediaPackage);
807       URI newUrl = addContentToRepo(mediaPackage, elementId, uri);
808       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Track,
809               flavor);
810       job.setStatus(Job.Status.FINISHED);
811       // store startTime
812       partialTrackStartTimes.put(elementId, startTime);
813       logger.debug("Added start time {} for track {}", startTime, elementId);
814       logger.info("Successful added partial track {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
815       return mp;
816     } catch (ServiceRegistryException e) {
817       throw new IngestException(e);
818     } catch (NotFoundException e) {
819       throw new IngestException("Unable to update ingest job", e);
820     } finally {
821       finallyUpdateJob(job);
822     }
823   }
824 
825   @Override
826   public MediaPackage addPartialTrack(InputStream in, String fileName, MediaPackageElementFlavor flavor, long startTime,
827           MediaPackage mediaPackage) throws IOException, IngestException {
828     Job job = null;
829     try {
830       job = serviceRegistry.createJob(JOB_TYPE, INGEST_TRACK, null, null, false);
831       job.setStatus(Status.RUNNING);
832       job = serviceRegistry.updateJob(job);
833       String elementId = UUID.randomUUID().toString();
834       logger.info("Start adding partial track {} from input stream on mediapackage {}", elementId, mediaPackage);
835       URI newUrl = addContentToRepo(mediaPackage, elementId, fileName, in);
836       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Track,
837               flavor);
838       job.setStatus(Job.Status.FINISHED);
839       // store startTime
840       partialTrackStartTimes.put(elementId, startTime);
841       logger.debug("Added start time {} for track {}", startTime, elementId);
842       logger.info("Successful added partial track {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
843       return mp;
844     } catch (ServiceRegistryException e) {
845       throw new IngestException(e);
846     } catch (NotFoundException e) {
847       throw new IngestException("Unable to update ingest job", e);
848     } finally {
849       finallyUpdateJob(job);
850     }
851   }
852 
853   /**
854    * {@inheritDoc}
855    *
856    * @see org.opencastproject.ingest.api.IngestService#addCatalog(java.net.URI,
857    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, String[], org.opencastproject.mediapackage.MediaPackage)
858    */
859   @Override
860   public MediaPackage addCatalog(URI uri, MediaPackageElementFlavor flavor, String[] tags, MediaPackage mediaPackage)
861           throws IOException, IngestException {
862     Job job = null;
863     try {
864       job = serviceRegistry.createJob(JOB_TYPE, INGEST_CATALOG_FROM_URI,
865               Arrays.asList(uri.toString(), flavor.toString(), MediaPackageParser.getAsXml(mediaPackage)), null, false,
866               ingestFileJobLoad);
867       job.setStatus(Status.RUNNING);
868       job = serviceRegistry.updateJob(job);
869       String elementId = UUID.randomUUID().toString();
870       logger.info("Start adding catalog {} from URL {} on mediapackage {}", elementId, uri, mediaPackage);
871       URI newUrl = addContentToRepo(mediaPackage, elementId, uri);
872       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Catalog,
873               flavor);
874       if (tags != null && tags.length > 0) {
875         MediaPackageElement catalogElement = mp.getCatalog(elementId);
876         for (String tag : tags) {
877           logger.info("Adding tag: " + tag + " to Element: " + elementId);
878           catalogElement.addTag(tag);
879         }
880       }
881       job.setStatus(Job.Status.FINISHED);
882       logger.info("Successful added catalog {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
883       return mp;
884     } catch (ServiceRegistryException e) {
885       throw new IngestException(e);
886     } catch (NotFoundException e) {
887       throw new IngestException("Unable to update ingest job", e);
888     } finally {
889       finallyUpdateJob(job);
890     }
891   }
892 
893   /**
894    * Updates the persistent representation of a series based on a potentially modified dublin core document.
895    *
896    * @param mediaPackage
897    *         the media package containing series metadata and ACLs.
898    * @return
899    *         true, if the series is created or overwritten, false if the existing series remains intact.
900    * @throws IOException if the series catalog was not found
901    * @throws IngestException if any other exception was encountered
902    */
903   protected boolean updateSeries(MediaPackage mediaPackage) throws IOException, IngestException {
904     Catalog[] seriesCatalogs = mediaPackage.getCatalogs(MediaPackageElements.SERIES);
905     if (seriesCatalogs.length == 0) {
906       return false;
907     } else if (seriesCatalogs.length > 1) {
908       logger.warn("Mediapackage {} has more than one series dublincore catalogs. Using catalog {} with ID {}.",
909           mediaPackage.getIdentifier(), seriesCatalogs[0].getURI(), seriesCatalogs[0].getIdentifier());
910     }
911     // Parse series dublincore
912     HttpResponse response = null;
913     InputStream in = null;
914     boolean isUpdated = false;
915     boolean isNew = false;
916     String seriesId = null;
917     try {
918       HttpGet getDc = new HttpGet(seriesCatalogs[0].getURI());
919       response = httpClient.execute(getDc);
920       in = response.getEntity().getContent();
921       DublinCoreCatalog dc = dublinCoreService.load(in);
922       seriesId = dc.getFirst(DublinCore.PROPERTY_IDENTIFIER);
923       if (seriesId == null) {
924         logger.warn("Series dublin core document contains no identifier, "
925             + "rejecting ingested series catalog for mediapackage {}.", mediaPackage.getIdentifier());
926       } else {
927         try {
928           try {
929             seriesService.getSeries(seriesId);
930             if (isAllowModifySeries) {
931               // Update existing series
932               seriesService.updateSeries(dc);
933               isUpdated = true;
934               logger.debug("Ingest is overwriting the existing series {} with the ingested series", seriesId);
935             } else {
936               logger.debug("Series {} already exists. Ignoring series catalog from ingest.", seriesId);
937             }
938           } catch (NotFoundException e) {
939             logger.info("Creating new series {} with default ACL.", seriesId);
940             seriesService.updateSeries(dc);
941             isUpdated = true;
942             isNew = true;
943           }
944 
945         } catch (Exception e) {
946           throw new IngestException(e);
947         }
948       }
949       in.close();
950     } catch (IOException e) {
951       logger.error("Error updating series from DublinCoreCatalog.}", e);
952     } finally {
953       IOUtils.closeQuietly(in);
954       httpClient.close(response);
955     }
956     if (!isUpdated) {
957       return isUpdated;
958     }
959     // Apply series extended metadata
960     for (MediaPackageElement seriesElement : mediaPackage.getElementsByFlavor(
961         MediaPackageElementFlavor.parseFlavor("*/series"))) {
962       if (MediaPackageElement.Type.Catalog == seriesElement.getElementType()
963           && !MediaPackageElements.SERIES.equals(seriesElement.getFlavor())) {
964         String catalogType = seriesElement.getFlavor().getType();
965         logger.info("Apply series {} metadata catalog from mediapackage {} to newly created series {}.",
966             catalogType, mediaPackage.getIdentifier(), seriesId);
967         byte[] data;
968         try {
969           HttpGet getExtendedMetadata = new HttpGet(seriesElement.getURI());
970           response = httpClient.execute(getExtendedMetadata);
971           in = response.getEntity().getContent();
972           data = IOUtils.readFully(in, (int) response.getEntity().getContentLength());
973         } catch (Exception e) {
974           throw new IngestException("Unable to read series " + catalogType + " metadata catalog for series "
975               + seriesId + ".", e);
976         } finally {
977           IOUtils.closeQuietly(in);
978           httpClient.close(response);
979         }
980         try {
981           seriesService.updateSeriesElement(seriesId, catalogType, data);
982         } catch (SeriesException e) {
983           throw new IngestException(
984               "Unable to update series " + catalogType + " catalog on newly created series " + seriesId + ".", e);
985         }
986       }
987     }
988     if (isNew) {
989       logger.info("Apply series ACL from mediapackage {} to newly created series {}.",
990           mediaPackage.getIdentifier(), seriesId);
991       Attachment[] seriesXacmls = mediaPackage.getAttachments(MediaPackageElements.XACML_POLICY_SERIES);
992       if (seriesXacmls.length > 0) {
993         if (seriesXacmls.length > 1) {
994           logger.warn("Mediapackage {} has more than one series xacml attachments. Using {}.",
995               mediaPackage.getIdentifier(), seriesXacmls[0].getURI());
996         }
997         AccessControlList seriesAcl = null;
998         try {
999           HttpGet getXacml = new HttpGet(seriesXacmls[0].getURI());
1000           response = httpClient.execute(getXacml);
1001           in = response.getEntity().getContent();
1002           seriesAcl = XACMLUtils.parseXacml(in);
1003         } catch (XACMLParsingException ex) {
1004           throw new IngestException("Unable to parse series xacml from mediapackage "
1005               + mediaPackage.getIdentifier() + ".", ex);
1006         } catch (IOException e) {
1007           logger.error("Error updating series {} ACL from mediapackage {}.",
1008               seriesId, mediaPackage.getIdentifier(), e);
1009           throw e;
1010         } finally {
1011           IOUtils.closeQuietly(in);
1012           httpClient.close(response);
1013         }
1014         try {
1015           seriesService.updateAccessControl(seriesId, seriesAcl);
1016         } catch (Exception e) {
1017           throw new IngestException("Unable to update series ACL on newly created series " + seriesId + ".", e);
1018         }
1019       }
1020     }
1021     return isUpdated;
1022   }
1023 
1024   /**
1025    * {@inheritDoc}
1026    *
1027    * @see org.opencastproject.ingest.api.IngestService#addCatalog(java.io.InputStream, java.lang.String,
1028    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
1029    */
1030   @Override
1031   public MediaPackage addCatalog(InputStream in, String fileName, MediaPackageElementFlavor flavor,
1032           MediaPackage mediaPackage) throws IOException, IngestException {
1033     return addCatalog(in, fileName, flavor, null, mediaPackage);
1034   }
1035 
1036   /**
1037    * {@inheritDoc}
1038    *
1039    * @see org.opencastproject.ingest.api.IngestService#addCatalog(java.io.InputStream, java.lang.String,
1040    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
1041    */
1042   @Override
1043   public MediaPackage addCatalog(InputStream in, String fileName, MediaPackageElementFlavor flavor, String[] tags,
1044           MediaPackage mediaPackage) throws IOException, IngestException, IllegalArgumentException {
1045     Job job = null;
1046     try {
1047       job = serviceRegistry.createJob(JOB_TYPE, INGEST_CATALOG, null, null, false, ingestFileJobLoad);
1048       job.setStatus(Status.RUNNING);
1049       job = serviceRegistry.updateJob(job);
1050       final String elementId = UUID.randomUUID().toString();
1051       final String mediaPackageId = mediaPackage.getIdentifier().toString();
1052       logger.info("Start adding catalog {} from input stream on mediapackage {}", elementId, mediaPackageId);
1053       final URI newUrl = addContentToRepo(mediaPackage, elementId, fileName, in);
1054 
1055       final boolean isJSON;
1056       try (InputStream inputStream = workingFileRepository.get(mediaPackageId, elementId)) {
1057         try (BufferedReader reader  = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
1058           // Exception for current BBB integration and Extron SMP351 which is ingesting a JSON array/object as catalog
1059           int firstChar = reader.read();
1060           isJSON = firstChar == '[' || firstChar == '{';
1061         }
1062       }
1063 
1064       if (isJSON) {
1065         logger.warn("Input catalog seems to be JSON. This is a mistake and will fail in future Opencast versions."
1066             + "You will likely want to ingest this as a media package attachment instead.");
1067       } else {
1068         // Verify XML is not corrupted
1069         try {
1070           XmlSafeParser.parse(workingFileRepository.get(mediaPackageId, elementId));
1071         } catch (SAXException e) {
1072           workingFileRepository.delete(mediaPackageId, elementId);
1073           throw new IllegalArgumentException("Catalog XML is invalid", e);
1074         }
1075       }
1076       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Catalog,
1077               flavor);
1078       if (tags != null && tags.length > 0) {
1079         MediaPackageElement trackElement = mp.getCatalog(elementId);
1080         for (String tag : tags) {
1081           logger.info("Adding tag {} to element {}", tag, elementId);
1082           trackElement.addTag(tag);
1083         }
1084       }
1085 
1086       job.setStatus(Job.Status.FINISHED);
1087       logger.info("Successful added catalog {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
1088       return mp;
1089     } catch (ServiceRegistryException e) {
1090       throw new IngestException(e);
1091     } catch (NotFoundException e) {
1092       throw new IngestException("Unable to update ingest job", e);
1093     } finally {
1094       finallyUpdateJob(job);
1095     }
1096   }
1097 
1098   /**
1099    * {@inheritDoc}
1100    *
1101    * @see org.opencastproject.ingest.api.IngestService#addAttachment(java.net.URI,
1102    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, String[], org.opencastproject.mediapackage.MediaPackage)
1103    */
1104   @Override
1105   public MediaPackage addAttachment(URI uri, MediaPackageElementFlavor flavor, String[] tags, MediaPackage mediaPackage)
1106           throws IOException, IngestException {
1107     Job job = null;
1108     try {
1109       job = serviceRegistry.createJob(JOB_TYPE, INGEST_ATTACHMENT_FROM_URI,
1110               Arrays.asList(uri.toString(), flavor.toString(), MediaPackageParser.getAsXml(mediaPackage)), null, false,
1111               ingestFileJobLoad);
1112       job.setStatus(Status.RUNNING);
1113       job = serviceRegistry.updateJob(job);
1114       String elementId = UUID.randomUUID().toString();
1115       logger.info("Start adding attachment {} from URL {} on mediapackage {}", elementId, uri, mediaPackage);
1116       URI newUrl = addContentToRepo(mediaPackage, elementId, uri);
1117       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Attachment,
1118               flavor);
1119       if (tags != null && tags.length > 0) {
1120         MediaPackageElement attachmentElement = mp.getAttachment(elementId);
1121         for (String tag : tags) {
1122           logger.debug("Adding tag: " + tag + " to Element: " + elementId);
1123           attachmentElement.addTag(tag);
1124         }
1125       }
1126       job.setStatus(Job.Status.FINISHED);
1127       logger.info("Successful added attachment {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
1128       return mp;
1129     } catch (ServiceRegistryException e) {
1130       throw new IngestException(e);
1131     } catch (NotFoundException e) {
1132       throw new IngestException("Unable to update ingest job", e);
1133     } finally {
1134       finallyUpdateJob(job);
1135     }
1136   }
1137 
1138   /**
1139    * {@inheritDoc}
1140    *
1141    * @see org.opencastproject.ingest.api.IngestService#addAttachment(java.io.InputStream, java.lang.String,
1142    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
1143    */
1144   @Override
1145   public MediaPackage addAttachment(InputStream in, String fileName, MediaPackageElementFlavor flavor, String[] tags,
1146           MediaPackage mediaPackage) throws IOException, IngestException {
1147     Job job = null;
1148     try {
1149       job = serviceRegistry.createJob(JOB_TYPE, INGEST_ATTACHMENT, null, null, false, ingestFileJobLoad);
1150       job.setStatus(Status.RUNNING);
1151       job = serviceRegistry.updateJob(job);
1152       String elementId = UUID.randomUUID().toString();
1153       logger.info("Start adding attachment {} from input stream on mediapackage {}", elementId, mediaPackage);
1154       URI newUrl = addContentToRepo(mediaPackage, elementId, fileName, in);
1155       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Attachment,
1156               flavor);
1157       if (tags != null && tags.length > 0) {
1158         MediaPackageElement trackElement = mp.getAttachment(elementId);
1159         for (String tag : tags) {
1160           logger.info("Adding tag: " + tag + " to Element: " + elementId);
1161           trackElement.addTag(tag);
1162         }
1163       }
1164       job.setStatus(Job.Status.FINISHED);
1165       logger.info("Successful added attachment {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
1166       return mp;
1167     } catch (ServiceRegistryException e) {
1168       throw new IngestException(e);
1169     } catch (NotFoundException e) {
1170       throw new IngestException("Unable to update ingest job", e);
1171     } finally {
1172       finallyUpdateJob(job);
1173     }
1174 
1175   }
1176 
1177   /**
1178    * {@inheritDoc}
1179    *
1180    * @see org.opencastproject.ingest.api.IngestService#addAttachment(java.io.InputStream, java.lang.String,
1181    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
1182    */
1183   @Override
1184   public MediaPackage addAttachment(InputStream in, String fileName, MediaPackageElementFlavor flavor,
1185           MediaPackage mediaPackage) throws IOException, IngestException {
1186     String[] tags = null;
1187     return addAttachment(in, fileName, flavor, tags, mediaPackage);
1188   }
1189 
1190   /**
1191    *
1192    * {@inheritDoc}
1193    *
1194    * @see org.opencastproject.ingest.api.IngestService#ingest(org.opencastproject.mediapackage.MediaPackage)
1195    */
1196   @Override
1197   public WorkflowInstance ingest(MediaPackage mp) throws IngestException {
1198     try {
1199       return ingest(mp, null, null, null);
1200     } catch (NotFoundException e) {
1201       throw new IngestException(e);
1202     } catch (UnauthorizedException e) {
1203       throw new IllegalStateException(e);
1204     }
1205   }
1206 
1207   /**
1208    * {@inheritDoc}
1209    *
1210    * @see org.opencastproject.ingest.api.IngestService#ingest(org.opencastproject.mediapackage.MediaPackage,
1211    *      java.lang.String, java.util.Map)
1212    */
1213   @Override
1214   public WorkflowInstance ingest(MediaPackage mp, String wd, Map<String, String> properties)
1215           throws IngestException, NotFoundException {
1216     try {
1217       return ingest(mp, wd, properties, null);
1218     } catch (UnauthorizedException e) {
1219       throw new IllegalStateException(e);
1220     }
1221   }
1222 
1223   /**
1224    * {@inheritDoc}
1225    *
1226    * @see org.opencastproject.ingest.api.IngestService#ingest(org.opencastproject.mediapackage.MediaPackage,
1227    *      java.lang.String, java.util.Map, java.lang.Long)
1228    */
1229   @Override
1230   public WorkflowInstance ingest(MediaPackage mp, String workflowDefinitionId, Map<String, String> properties,
1231           Long workflowInstanceId) throws IngestException, NotFoundException, UnauthorizedException {
1232     // Check for legacy media package id
1233     mp = checkForLegacyMediaPackageId(mp, properties);
1234 
1235     try {
1236       mp = createSmil(mp);
1237     } catch (IOException e) {
1238       throw new IngestException("Unable to add SMIL Catalog", e);
1239     }
1240 
1241     try {
1242       updateSeries(mp);
1243     } catch (IOException e) {
1244       throw new IngestException("Unable to create or update series from mediapackage " + mp.getIdentifier() + ".", e);
1245     }
1246 
1247     // Done, update the job status and return the created workflow instance
1248     if (workflowInstanceId != null) {
1249       logger.warn(
1250               "Resuming workflow {} with ingested mediapackage {} is deprecated, skip resuming and start new workflow",
1251               workflowInstanceId, mp);
1252     }
1253 
1254     if (workflowDefinitionId == null) {
1255       logger.info("Starting a new workflow with ingested mediapackage {} based on the default workflow definition '{}'",
1256               mp, defaultWorkflowDefinionId);
1257     } else {
1258       logger.info("Starting a new workflow with ingested mediapackage {} based on workflow definition '{}'", mp,
1259               workflowDefinitionId);
1260     }
1261 
1262     try {
1263       // Determine the workflow definition
1264       WorkflowDefinition workflowDef = getWorkflowDefinition(workflowDefinitionId, mp);
1265 
1266       // Get the final set of workflow properties
1267       properties = mergeWorkflowConfiguration(properties, mp.getIdentifier().toString());
1268 
1269       // Remove potential workflow configuration prefixes from the workflow properties
1270       properties = removePrefixFromProperties(properties);
1271 
1272       // Merge scheduled mediapackage with ingested
1273       mp = mergeScheduledMediaPackage(mp);
1274       if (mp.getSeries() == null) {
1275         mp = checkForCASeries(mp, createSeriesAppendix);
1276       }
1277 
1278 
1279       ingestStatistics.successful();
1280       if (workflowDef != null) {
1281         logger.info("Starting new workflow with ingested mediapackage '{}' using the specified template '{}'",
1282                 mp.getIdentifier().toString(), workflowDefinitionId);
1283       } else {
1284         logger.info("Starting new workflow with ingested mediapackage '{}' using the default template '{}'",
1285                 mp.getIdentifier().toString(), defaultWorkflowDefinionId);
1286       }
1287       return workflowService.start(workflowDef, mp, properties);
1288     } catch (WorkflowException e) {
1289       ingestStatistics.failed();
1290       throw new IngestException(e);
1291     }
1292   }
1293 
1294   @Override
1295   public void schedule(MediaPackage mediaPackage, String workflowDefinitionID, Map<String, String> properties)
1296           throws IllegalStateException, IngestException, NotFoundException, UnauthorizedException, SchedulerException {
1297     MediaPackageElement[] mediaPackageElements = mediaPackage.getElementsByFlavor(MediaPackageElements.EPISODE);
1298     if (mediaPackageElements.length != 1) {
1299       logger.debug("There can be only one (and exactly one) episode dublin core catalog: https://youtu.be/_J3VeogFUOs");
1300       throw new IngestException("There can be only one (and exactly one) episode dublin core catalog");
1301     }
1302     InputStream inputStream;
1303     DublinCoreCatalog dublinCoreCatalog;
1304     try {
1305       inputStream = workingFileRepository.get(mediaPackage.getIdentifier().toString(),
1306               mediaPackageElements[0].getIdentifier());
1307       dublinCoreCatalog = dublinCoreService.load(inputStream);
1308     } catch (IOException e) {
1309       throw new IngestException(e);
1310     }
1311 
1312     EName temporal = new EName(DublinCore.TERMS_NS_URI, "temporal");
1313     List<DublinCoreValue> periods = dublinCoreCatalog.get(temporal);
1314     if (periods.size() != 1) {
1315       logger.debug("There can be only one (and exactly one) period");
1316       throw new IngestException("There can be only one (and exactly one) period");
1317     }
1318     DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(periods.get(0));
1319     if (!period.hasStart() || !period.hasEnd()) {
1320       logger.debug("A scheduled recording needs to have a start and end.");
1321       throw new IngestException("A scheduled recording needs to have a start and end.");
1322     }
1323     EName createdEName = new EName(DublinCore.TERMS_NS_URI, "created");
1324     List<DublinCoreValue> created = dublinCoreCatalog.get(createdEName);
1325     if (created.size() == 0) {
1326       logger.debug("Created not set");
1327     } else if (created.size() == 1) {
1328       Date date = EncodingSchemeUtils.decodeMandatoryDate(created.get(0));
1329       if (date.getTime() != period.getStart().getTime()) {
1330         logger.debug("start and created date differ ({} vs {})", date.getTime(), period.getStart().getTime());
1331         throw new IngestException("Temporal start and created date differ");
1332       }
1333     } else {
1334       logger.debug("There can be only one created date");
1335       throw new IngestException("There can be only one created date");
1336     }
1337     String captureAgent = getCaptureAgent(dublinCoreCatalog);
1338 
1339     // Go through properties
1340     Map<String, String> agentProperties = new HashMap<>();
1341     Map<String, String> workflowProperties = new HashMap<>();
1342     for (String key : properties.keySet()) {
1343       if (key.startsWith("org.opencastproject.workflow.config.")) {
1344         workflowProperties.put(key, properties.get(key));
1345       } else {
1346         agentProperties.put(key, properties.get(key));
1347       }
1348     }
1349 
1350     // Remove workflow configuration prefixes from the workflow properties
1351     workflowProperties = removePrefixFromProperties(workflowProperties);
1352 
1353     try {
1354       schedulerService.addEvent(period.getStart(), period.getEnd(), captureAgent, new HashSet<>(), mediaPackage,
1355               workflowProperties, agentProperties, Optional.empty());
1356     } finally {
1357       for (MediaPackageElement mediaPackageElement : mediaPackage.getElements()) {
1358         try {
1359           workingFileRepository.delete(mediaPackage.getIdentifier().toString(), mediaPackageElement.getIdentifier());
1360         } catch (IOException e) {
1361           logger.warn("Failed to delete media package element", e);
1362         }
1363       }
1364     }
1365   }
1366 
1367     private String getCaptureAgent(DublinCoreCatalog dublinCoreCatalog) throws IngestException {
1368     // spatial
1369     EName spatial = new EName(DublinCore.TERMS_NS_URI, "spatial");
1370     List<DublinCoreValue> captureAgents = dublinCoreCatalog.get(spatial);
1371     if (captureAgents.size() != 1) {
1372       logger.debug("Exactly one capture agent needs to be set");
1373       throw new IngestException("Exactly one capture agent needs to be set");
1374     }
1375     return captureAgents.get(0).getValue();
1376   }
1377 
1378   /**
1379    * Check whether the mediapackage id is set via the legacy workflow identifier and change the id if existing.
1380    *
1381    * @param mp
1382    *          the mediapackage
1383    * @param properties
1384    *          the workflow properties
1385    * @return the mediapackage
1386    */
1387   private MediaPackage checkForLegacyMediaPackageId(MediaPackage mp, Map<String, String> properties)
1388           throws IngestException {
1389     if (properties == null || properties.isEmpty())
1390       return mp;
1391 
1392     try {
1393       String mediaPackageId = properties.get(LEGACY_MEDIAPACKAGE_ID_KEY);
1394       if (StringUtils.isNotBlank(mediaPackageId) && schedulerService != null) {
1395         logger.debug("Check ingested mediapackage {} for legacy mediapackage identifier {}",
1396                 mp.getIdentifier().toString(), mediaPackageId);
1397         try {
1398           schedulerService.getMediaPackage(mp.getIdentifier().toString());
1399           return mp;
1400         } catch (NotFoundException e) {
1401           logger.info("No scheduler mediapackage found with ingested id {}, try legacy mediapackage id {}",
1402                   mp.getIdentifier().toString(), mediaPackageId);
1403           try {
1404             schedulerService.getMediaPackage(mediaPackageId);
1405             logger.info("Legacy mediapackage id {} exists, change ingested mediapackage id {} to legacy id",
1406                     mediaPackageId, mp.getIdentifier().toString());
1407             mp.setIdentifier(new IdImpl(mediaPackageId));
1408             return mp;
1409           } catch (NotFoundException e1) {
1410             logger.info("No scheduler mediapackage found with legacy mediapackage id {}, skip merging", mediaPackageId);
1411           } catch (Exception e1) {
1412             logger.error("Unable to get event mediapackage from scheduler event {}", mediaPackageId, e);
1413             throw new IngestException(e);
1414           }
1415         } catch (Exception e) {
1416           logger.error("Unable to get event mediapackage from scheduler event {}", mp.getIdentifier().toString(), e);
1417           throw new IngestException(e);
1418         }
1419       }
1420       return mp;
1421     } finally {
1422       properties.remove(LEGACY_MEDIAPACKAGE_ID_KEY);
1423     }
1424   }
1425 
1426   private Map<String, String> mergeWorkflowConfiguration(Map<String, String> properties, String mediaPackageId) {
1427     if (isBlank(mediaPackageId) || schedulerService == null)
1428       return properties;
1429 
1430     HashMap<String, String> mergedProperties = new HashMap<>();
1431 
1432     try {
1433       Map<String, String> recordingProperties = schedulerService.getCaptureAgentConfiguration(mediaPackageId);
1434       logger.debug("Restoring workflow properties from scheduler event {}", mediaPackageId);
1435       mergedProperties.putAll(recordingProperties);
1436     } catch (SchedulerException e) {
1437       logger.warn("Unable to get workflow properties from scheduler event {}", mediaPackageId, e);
1438     } catch (NotFoundException e) {
1439       logger.info("No capture event found for id {}", mediaPackageId);
1440     } catch (UnauthorizedException e) {
1441       throw new IllegalStateException(e);
1442     }
1443 
1444     if (properties != null) {
1445       // Merge the properties, this must be after adding the recording properties
1446       logger.debug("Merge workflow properties with the one from the scheduler event {}", mediaPackageId);
1447       mergedProperties.putAll(properties);
1448     }
1449 
1450     return mergedProperties;
1451   }
1452 
1453   /**
1454    * Merges the ingested mediapackage with the scheduled mediapackage. The ingested mediapackage takes precedence over
1455    * the scheduled mediapackage.
1456    *
1457    * @param mp
1458    *          the ingested mediapackage
1459    * @return the merged mediapackage
1460    */
1461   private MediaPackage mergeScheduledMediaPackage(MediaPackage mp) throws IngestException {
1462     if (schedulerService == null) {
1463       logger.warn("No scheduler service available to merge mediapackage!");
1464       return mp;
1465     }
1466 
1467     try {
1468       MediaPackage scheduledMp = schedulerService.getMediaPackage(mp.getIdentifier().toString());
1469       logger.info("Found matching scheduled event for id '{}', merging mediapackage...", mp.getIdentifier().toString());
1470       mergeMediaPackageElements(mp, scheduledMp);
1471       mergeMediaPackageMetadata(mp, scheduledMp);
1472       return mp;
1473     } catch (NotFoundException e) {
1474       logger.debug("No scheduler mediapackage found with id {}, skip merging", mp.getIdentifier());
1475       return mp;
1476     } catch (Exception e) {
1477       throw new IngestException(String.format("Unable to get event media package from scheduler event %s",
1478               mp.getIdentifier()), e);
1479     }
1480   }
1481 
1482   /**
1483    * Merge different elements from capture agent ingesting mp and Asset manager. Overwrite or replace same flavored
1484    * elements depending on the Ingest Service overwrite configuration. Ignore publications (i.e. live publication
1485    * channel from Asset Manager) Always keep tracks from the capture agent.
1486    *
1487    * @param mp
1488    *          the medipackage being ingested from the Capture Agent
1489    * @param scheduledMp
1490    *          the mediapckage that was schedule and managed by the Asset Manager
1491    */
1492   private void mergeMediaPackageElements(MediaPackage mp, MediaPackage scheduledMp) {
1493     // drop catalogs sent by the capture agent in favor of Opencast's own metadata
1494     if (skipCatalogs) {
1495       for (MediaPackageElement element : mp.getCatalogs()) {
1496         if (!element.getFlavor().equals(MediaPackageElements.SMIL)) {
1497           mp.remove(element);
1498         }
1499       }
1500     }
1501 
1502     // drop attachments the capture agent sent us in favor of Opencast's attachments
1503     // e.g. prevent capture agents from modifying security rules of schedules events
1504     if (skipAttachments) {
1505       for (MediaPackageElement element : mp.getAttachments()) {
1506         mp.remove(element);
1507       }
1508     }
1509 
1510     for (MediaPackageElement element : scheduledMp.getElements()) {
1511       if (MediaPackageElement.Type.Publication.equals(element.getElementType())) {
1512         // The Asset managed media package may have a publication element for a live event, if retract live has not run yet.
1513         // Publications do not have flavors and are never part of the mediapackage from the capture agent.
1514         // Therefore, ignore publication element because it is removed when the recorded media is published and causes complications (on short media) if added.
1515         logger.debug("Ignoring {}, not adding to ingested mediapackage {}", MediaPackageElement.Type.Publication, mp);
1516         continue;
1517       } else if (mp.getElementsByFlavor(element.getFlavor()).length > 0) {
1518         // The default is to overwrite matching flavored elements in the Asset managed mediapackage (e.g. catalogs)
1519         // If isOverwrite is true, changes made from the CA overwrite (update/revert) changes made from the Admin UI.
1520         // If isOverwrite is false, changes made from the CA do not overwrite (update/revert) changes made from the Admin UI.
1521         // regardless of overwrite, always keep new ingested tracks.
1522         if (!isAddOnlyNew || MediaPackageElement.Type.Track.equals(element.getElementType())) {
1523           // Allow updates made from the Capture Agent to overwrite existing metadata in Opencast
1524           logger.info(
1525                   "Omitting Opencast (Asset Managed) element '{}', replacing with ingested element of same flavor '{}'",
1526                   element,
1527                   element.getFlavor());
1528           continue;
1529         }
1530         // Remove flavored element from ingested mp and replaced it with maching element from Asset Managed mediapackage.
1531         // This protects updates made from the admin UI during an event capture from being reverted by artifacts from the ingested CA.
1532         for (MediaPackageElement el : mp.getElementsByFlavor(element.getFlavor())) {
1533           logger.info("Omitting ingested element '{}' {}, keeping existing (Asset Managed) element of same flavor '{}'", el, el.getURI(),
1534                   element.getFlavor());
1535           mp.remove(el);
1536         }
1537       }
1538       logger.info("Adding element {} from scheduled (Asset Managed) event '{}' into ingested mediapackage", element, mp);
1539       mp.add(element);
1540     }
1541   }
1542 
1543   /**
1544    *
1545    * The previous OC behaviour is for metadata in the ingested mediapackage to be updated by the
1546    * Asset Managed metadata *only* when the field is blank on the ingested mediapackage.
1547    * However, that field may have been intentionally emptied by
1548    * removing its value from the Capture Agent UI (e.g. Galicaster)
1549    *
1550    * If isOverwrite is true, metadata values in the ingest mediapackage overwrite Asset Managed metadata.
1551    * If isOverwrite is false, Asset Managed metadata is preserved.
1552    *
1553    * @param mp,
1554    *          the inbound ingested mp
1555    * @param scheduledMp,
1556    *          the existing scheduled mp
1557    */
1558   private void mergeMediaPackageMetadata(MediaPackage mp, MediaPackage scheduledMp) {
1559     // Merge media package fields depending on overwrite setting
1560     boolean noOverwrite = (isAddOnlyNew && !skipCatalogs) || skipCatalogs;
1561     if ((mp.getDate() == null) || noOverwrite)
1562       mp.setDate(scheduledMp.getDate());
1563     if (isBlank(mp.getLicense()) || noOverwrite)
1564       mp.setLicense(scheduledMp.getLicense());
1565     if (isBlank(mp.getSeries()) || noOverwrite)
1566       mp.setSeries(scheduledMp.getSeries());
1567     if (isBlank(mp.getSeriesTitle()) || noOverwrite)
1568       mp.setSeriesTitle(scheduledMp.getSeriesTitle());
1569     if (isBlank(mp.getTitle()) || noOverwrite)
1570       mp.setTitle(scheduledMp.getTitle());
1571 
1572     if (mp.getSubjects().length <= 0 || noOverwrite) {
1573       Arrays.stream(mp.getSubjects()).forEach(mp::removeSubject);
1574       for (String subject : scheduledMp.getSubjects()) {
1575         mp.addSubject(subject);
1576       }
1577     }
1578     if (noOverwrite || mp.getContributors().length == 0) {
1579       Arrays.stream(mp.getContributors()).forEach(mp::removeContributor);
1580       for (String contributor : scheduledMp.getContributors()) {
1581         mp.addContributor(contributor);
1582       }
1583     }
1584     if (noOverwrite || mp.getCreators().length == 0) {
1585       Arrays.stream(mp.getCreators()).forEach(mp::removeCreator);
1586       for (String creator : scheduledMp.getCreators()) {
1587         mp.addCreator(creator);
1588       }
1589     }
1590   }
1591 
1592   /**
1593    * Removes the workflow configuration file prefix from all properties in a map.
1594    *
1595    * @param properties
1596    *          The properties to remove the prefixes from
1597    * @return A Map with the same collection of properties without the prefix
1598    */
1599   private Map<String, String> removePrefixFromProperties(Map<String, String> properties) {
1600     Map<String, String> fixedProperties = new HashMap<>();
1601     if (properties != null) {
1602       for (Entry<String, String> entry : properties.entrySet()) {
1603         if (entry.getKey().startsWith(WORKFLOW_CONFIGURATION_PREFIX)) {
1604           logger.debug("Removing prefix from key '" + entry.getKey() + " with value '" + entry.getValue() + "'");
1605           fixedProperties.put(entry.getKey().replace(WORKFLOW_CONFIGURATION_PREFIX, ""), entry.getValue());
1606         } else {
1607           fixedProperties.put(entry.getKey(), entry.getValue());
1608         }
1609       }
1610     }
1611     return fixedProperties;
1612   }
1613 
1614   private WorkflowDefinition getWorkflowDefinition(String workflowDefinitionID, MediaPackage mediapackage)
1615           throws NotFoundException, WorkflowDatabaseException, IngestException {
1616     // If the workflow definition and instance ID are null, use the default, or throw if there is none
1617     if (isBlank(workflowDefinitionID)) {
1618       String mediaPackageId = mediapackage.getIdentifier().toString();
1619       if (schedulerService != null) {
1620         logger.info("Determining workflow template for ingested mediapckage {} from capture event {}", mediapackage,
1621                 mediaPackageId);
1622         try {
1623           Map<String, String> recordingProperties = schedulerService.getCaptureAgentConfiguration(mediaPackageId);
1624           workflowDefinitionID = recordingProperties.get(CaptureParameters.INGEST_WORKFLOW_DEFINITION);
1625           if (isBlank(workflowDefinitionID)) {
1626             workflowDefinitionID = defaultWorkflowDefinionId;
1627             logger.debug("No workflow set. Falling back to default.");
1628           }
1629           if (isBlank(workflowDefinitionID)) {
1630             throw new IngestException("No value found for key '" + CaptureParameters.INGEST_WORKFLOW_DEFINITION
1631                     + "' from capture event configuration of scheduler event '" + mediaPackageId + "'");
1632           }
1633           logger.info("Ingested event {} will be processed using workflow '{}'", mediapackage, workflowDefinitionID);
1634         } catch (NotFoundException e) {
1635           logger.warn("Specified capture event {} was not found", mediaPackageId);
1636         } catch (UnauthorizedException e) {
1637           throw new IllegalStateException(e);
1638         } catch (SchedulerException e) {
1639           logger.warn("Unable to get the workflow definition id from scheduler event {}", mediaPackageId, e);
1640           throw new IngestException(e);
1641         }
1642       } else {
1643         logger.warn(
1644                 "Scheduler service not bound, unable to determine the workflow template to use for ingested mediapckage {}",
1645                 mediapackage);
1646       }
1647 
1648     } else {
1649       logger.info("Ingested mediapackage {} is processed using workflow template '{}', specified during ingest",
1650               mediapackage, workflowDefinitionID);
1651     }
1652 
1653     // Use the default workflow definition if nothing was determined
1654     if (isBlank(workflowDefinitionID) && defaultWorkflowDefinionId != null) {
1655       logger.info("Using default workflow definition '{}' to process ingested mediapackage {}",
1656               defaultWorkflowDefinionId, mediapackage);
1657       workflowDefinitionID = defaultWorkflowDefinionId;
1658     }
1659 
1660     // Check if the workflow definition is valid
1661     if (StringUtils.isNotBlank(workflowDefinitionID) && StringUtils.isNotBlank(defaultWorkflowDefinionId)) {
1662       try {
1663         workflowService.getWorkflowDefinitionById(workflowDefinitionID);
1664       } catch (WorkflowDatabaseException e) {
1665         throw new IngestException(e);
1666       } catch (NotFoundException nfe) {
1667         logger.warn("Workflow definition {} not found, using default workflow {} instead", workflowDefinitionID,
1668                 defaultWorkflowDefinionId);
1669         workflowDefinitionID = defaultWorkflowDefinionId;
1670       }
1671     }
1672 
1673     // Have we been able to find a workflow definition id?
1674     if (isBlank(workflowDefinitionID)) {
1675       ingestStatistics.failed();
1676       throw new IllegalStateException(
1677               "Can not ingest a workflow without a workflow definition or an existing instance. No default definition is specified");
1678     }
1679 
1680     // Let's make sure the workflow definition exists
1681     return workflowService.getWorkflowDefinitionById(workflowDefinitionID);
1682   }
1683 
1684   /**
1685    *
1686    * {@inheritDoc}
1687    *
1688    * @see org.opencastproject.ingest.api.IngestService#discardMediaPackage(org.opencastproject.mediapackage.MediaPackage)
1689    */
1690   @Override
1691   public void discardMediaPackage(MediaPackage mp) throws IOException {
1692     String mediaPackageId = mp.getIdentifier().toString();
1693     for (MediaPackageElement element : mp.getElements()) {
1694       if (!workingFileRepository.delete(mediaPackageId, element.getIdentifier()))
1695         logger.warn("Unable to find (and hence, delete), this mediapackage element");
1696     }
1697     logger.info("Successfully discarded media package {}", mp);
1698   }
1699 
1700   protected URI addContentToRepo(MediaPackage mp, String elementId, URI uri) throws IOException {
1701     InputStream in = null;
1702     HttpResponse response = null;
1703     CloseableHttpClient externalHttpClient = null;
1704     try {
1705       if (uri.toString().startsWith("http")) {
1706         HttpGet get = new HttpGet(uri);
1707 
1708         if (!isBlank(downloadSource) && uri.toString().matches(downloadSource)) {
1709           // NB: We're creating a new client here with *different* auth than the system auth creds
1710           externalHttpClient = getAuthedHttpClient();
1711           get.setHeader("X-Requested-Auth", downloadAuthMethod);
1712           if ("Basic".equals(downloadAuthMethod) && downloadAuthForceBasic) {
1713             String auth = downloadUser + ":" + downloadPassword;
1714             byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes(StandardCharsets.ISO_8859_1));
1715             String authHeader = "Basic " + new String(encodedAuth);
1716             get.setHeader(HttpHeaders.AUTHORIZATION, authHeader);
1717           }
1718           response = externalHttpClient.execute(get);
1719         } else {
1720           // httpClient checks internally to see if it should be sending the default auth, or not.
1721           response = httpClient.execute(get);
1722         }
1723 
1724         if (null == response) {
1725           // If you get here then chances are you're using a mock httpClient which does not have appropriate
1726           // mocking to respond to the URL you are feeding it.  Try adding that URL to the mock and see if that works.
1727           throw new IOException("Null response object from the http client, refer to code for explanation");
1728         }
1729 
1730         int httpStatusCode = response.getStatusLine().getStatusCode();
1731         if (httpStatusCode != 200) {
1732           throw new IOException(uri + " returns http " + httpStatusCode);
1733         }
1734         in = response.getEntity().getContent();
1735         //If it does not start with file, or we're in test mode (ie, to allow arbitrary file:// access)
1736       } else if (!uri.toString().startsWith("file") || testMode) {
1737         in = uri.toURL().openStream();
1738       } else {
1739         throw new IOException("Refusing to fetch files from the local filesystem");
1740       }
1741       String fileName = FilenameUtils.getName(uri.getPath());
1742       if (isBlank(FilenameUtils.getExtension(fileName)))
1743         fileName = getContentDispositionFileName(response);
1744 
1745       if (isBlank(FilenameUtils.getExtension(fileName)))
1746         throw new IOException("No filename extension found: " + fileName);
1747       return addContentToRepo(mp, elementId, fileName, in);
1748     } finally {
1749       if (in != null) {
1750         in.close();
1751       }
1752       if (externalHttpClient != null) {
1753         externalHttpClient.close();
1754       }
1755       httpClient.close(response);
1756     }
1757   }
1758 
1759   private String getContentDispositionFileName(HttpResponse response) {
1760     if (response == null)
1761       return null;
1762 
1763     Header header = response.getFirstHeader("Content-Disposition");
1764     ContentDisposition contentDisposition = new ContentDisposition(header.getValue());
1765     return contentDisposition.getParameter("filename");
1766   }
1767 
1768   private URI addContentToRepo(MediaPackage mp, String elementId, String filename, InputStream file)
1769           throws IOException {
1770     ProgressInputStream progressInputStream = new ProgressInputStream(file);
1771     progressInputStream.addPropertyChangeListener(new PropertyChangeListener() {
1772       @Override
1773       public void propertyChange(PropertyChangeEvent evt) {
1774         long totalNumBytesRead = (Long) evt.getNewValue();
1775         long oldTotalNumBytesRead = (Long) evt.getOldValue();
1776         ingestStatistics.add(totalNumBytesRead - oldTotalNumBytesRead);
1777       }
1778     });
1779     return workingFileRepository.put(mp.getIdentifier().toString(), elementId, filename, progressInputStream);
1780   }
1781 
1782   private MediaPackage addContentToMediaPackage(MediaPackage mp, String elementId, URI uri,
1783           MediaPackageElement.Type type, MediaPackageElementFlavor flavor) {
1784     logger.info("Adding element of type {} to mediapackage {}", type, mp);
1785     MediaPackageElement mpe = mp.add(uri, type, flavor);
1786     mpe.setIdentifier(elementId);
1787     return mp;
1788   }
1789 
1790   // ---------------------------------------------
1791   // --------- bind and unbind bundles ---------
1792   // ---------------------------------------------
1793   @Reference
1794   public void setWorkflowService(WorkflowService workflowService) {
1795     this.workflowService = workflowService;
1796   }
1797 
1798   @Reference
1799   public void setWorkingFileRepository(WorkingFileRepository workingFileRepository) {
1800     this.workingFileRepository = workingFileRepository;
1801   }
1802 
1803   @Reference
1804   public void setSeriesService(SeriesService seriesService) {
1805     this.seriesService = seriesService;
1806   }
1807 
1808   @Reference
1809   public void setDublinCoreService(DublinCoreCatalogService dublinCoreService) {
1810     this.dublinCoreService = dublinCoreService;
1811   }
1812 
1813   /**
1814    * {@inheritDoc}
1815    *
1816    * @see org.opencastproject.job.api.AbstractJobProducer#getServiceRegistry()
1817    */
1818   @Override
1819   protected ServiceRegistry getServiceRegistry() {
1820     return serviceRegistry;
1821   }
1822 
1823   /**
1824    * {@inheritDoc}
1825    *
1826    * @see org.opencastproject.job.api.AbstractJobProducer#process(org.opencastproject.job.api.Job)
1827    */
1828   @Override
1829   protected String process(Job job) throws Exception {
1830     throw new IllegalStateException("Ingest jobs are not expected to be dispatched");
1831   }
1832 
1833   /**
1834    * Callback for setting the security service.
1835    *
1836    * @param securityService
1837    *          the securityService to set
1838    */
1839   @Reference
1840   public void setSecurityService(SecurityService securityService) {
1841     this.securityService = securityService;
1842   }
1843 
1844   /**
1845    * Callback for setting the user directory service.
1846    *
1847    * @param userDirectoryService
1848    *          the userDirectoryService to set
1849    */
1850   @Reference
1851   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1852     this.userDirectoryService = userDirectoryService;
1853   }
1854 
1855   /**
1856    * Callback for setting the scheduler service.
1857    *
1858    * @param schedulerService
1859    *          the scheduler service to set
1860    */
1861   @Reference(
1862     policy = ReferencePolicy.DYNAMIC,
1863     cardinality = ReferenceCardinality.OPTIONAL,
1864     unbind = "unsetSchedulerService"
1865   )
1866   public void setSchedulerService(SchedulerService schedulerService) {
1867     this.schedulerService = schedulerService;
1868   }
1869 
1870   public void unsetSchedulerService(SchedulerService schedulerService) {
1871     if (this.schedulerService == schedulerService) {
1872       this.schedulerService = null;
1873     }
1874   }
1875 
1876   /**
1877    * Sets a reference to the organization directory service.
1878    *
1879    * @param organizationDirectory
1880    *          the organization directory
1881    */
1882   @Reference
1883   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
1884     organizationDirectoryService = organizationDirectory;
1885   }
1886 
1887   /**
1888    * {@inheritDoc}
1889    *
1890    * @see org.opencastproject.job.api.AbstractJobProducer#getSecurityService()
1891    */
1892   @Override
1893   protected SecurityService getSecurityService() {
1894     return securityService;
1895   }
1896 
1897   /**
1898    * {@inheritDoc}
1899    *
1900    * @see org.opencastproject.job.api.AbstractJobProducer#getUserDirectoryService()
1901    */
1902   @Override
1903   protected UserDirectoryService getUserDirectoryService() {
1904     return userDirectoryService;
1905   }
1906 
1907   /**
1908    * {@inheritDoc}
1909    *
1910    * @see org.opencastproject.job.api.AbstractJobProducer#getOrganizationDirectoryService()
1911    */
1912   @Override
1913   protected OrganizationDirectoryService getOrganizationDirectoryService() {
1914     return organizationDirectoryService;
1915   }
1916 
1917   protected CloseableHttpClient getAuthedHttpClient() {
1918     HttpClientBuilder cb = HttpClientBuilder.create();
1919     CredentialsProvider provider = new BasicCredentialsProvider();
1920     String schema = AuthSchemes.DIGEST;
1921     if ("Basic".equals(downloadAuthMethod)) {
1922       schema = AuthSchemes.BASIC;
1923     }
1924     provider.setCredentials(
1925       new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, AuthScope.ANY_REALM, schema),
1926       new UsernamePasswordCredentials(downloadUser, downloadPassword));
1927     return cb.setDefaultCredentialsProvider(provider).build();
1928   }
1929 
1930   private MediaPackage createSmil(MediaPackage mediaPackage) throws IOException, IngestException {
1931     List<Track> partialTracks = new ArrayList<>();
1932     for (Track track : mediaPackage.getTracks()) {
1933       Long startTime = partialTrackStartTimes.getIfPresent(track.getIdentifier());
1934       if (startTime != null) {
1935         partialTracks.add(track);
1936       }
1937     }
1938 
1939     // No partial track available return without adding SMIL catalog
1940     if (partialTracks.isEmpty()) {
1941       return mediaPackage;
1942     }
1943 
1944     // Inspect the partial tracks
1945     List<Track> tracks = partialTracks.stream()
1946         .map(track -> {
1947           try {
1948             // Create a media inspection job for a mediapackage element.
1949             return mediaInspectionService.enrich(track, true);
1950           } catch (Exception e) {
1951             throw new RuntimeException("Error enriching track", e);
1952           }
1953         })
1954         .map(job -> {
1955           try {
1956             // Interpret the payload of a completed Job as a MediaPackageElement.
1957             // Wait for the job to complete if necessary
1958             waitForJob(getServiceRegistry(), none(0L), job);
1959             return (Track) MediaPackageElementParser.getFromXml(job.getPayload());
1960           } catch (Exception e) {
1961             throw new RuntimeException("Error parsing job payload as track", e);
1962           }
1963         })
1964         .peek(MediaPackageSupport.updateElement(mediaPackage))
1965         .collect(Collectors.toList());
1966 
1967     // Create the SMIL document
1968     org.w3c.dom.Document smilDocument = SmilUtil.createSmil();
1969     for (Track track : tracks) {
1970       Long startTime = partialTrackStartTimes.getIfPresent(track.getIdentifier());
1971       if (startTime == null) {
1972         logger.error("No start time found for track {}", track);
1973         throw new IngestException("No start time found for track " + track.getIdentifier());
1974       }
1975       smilDocument = addSmilTrack(smilDocument, track, startTime);
1976       partialTrackStartTimes.invalidate(track.getIdentifier());
1977     }
1978 
1979     // Store the SMIL document in the mediapackage
1980     return addSmilCatalog(smilDocument, mediaPackage);
1981   }
1982 
1983   /**
1984    * Adds a SMIL catalog to a mediapackage if it's not already existing.
1985    *
1986    * @param smilDocument
1987    *          the smil document
1988    * @param mediaPackage
1989    *          the mediapackage to extend with the SMIL catalog
1990    * @return the augmented mediapcakge
1991    * @throws IOException
1992    *           if reading or writing of the SMIL catalog fails
1993    * @throws IngestException
1994    *           if the SMIL catalog already exists
1995    */
1996   private MediaPackage addSmilCatalog(org.w3c.dom.Document smilDocument, MediaPackage mediaPackage)
1997           throws IOException, IngestException {
1998     Option<org.w3c.dom.Document> optSmilDocument = loadSmilDocument(workingFileRepository, mediaPackage);
1999     if (optSmilDocument.isSome())
2000       throw new IngestException("SMIL already exists!");
2001 
2002     InputStream in = null;
2003     try {
2004       in = XmlUtil.serializeDocument(smilDocument);
2005       String elementId = UUID.randomUUID().toString();
2006       URI uri = workingFileRepository.put(mediaPackage.getIdentifier().toString(), elementId, PARTIAL_SMIL_NAME, in);
2007       MediaPackageElement mpe = mediaPackage.add(uri, MediaPackageElement.Type.Catalog, MediaPackageElements.SMIL);
2008       mpe.setIdentifier(elementId);
2009       // Reset the checksum since it changed
2010       mpe.setChecksum(null);
2011       mpe.setMimeType(MimeTypes.SMIL);
2012       return mediaPackage;
2013     } finally {
2014       IoSupport.closeQuietly(in);
2015     }
2016   }
2017 
2018   /**
2019    * Load a SMIL document of a media package.
2020    *
2021    * @return the document or none if no media package element found.
2022    */
2023   private Option<org.w3c.dom.Document> loadSmilDocument(final WorkingFileRepository workingFileRepository,
2024           MediaPackage mp) {
2025     return mlist(mp.getElements()).filter(MediaPackageSupport.Filters.isSmilCatalog).headOpt()
2026             .map(new Function<MediaPackageElement, org.w3c.dom.Document>() {
2027               @Override
2028               public org.w3c.dom.Document apply(MediaPackageElement mpe) {
2029                 InputStream in = null;
2030                 try {
2031                   in = workingFileRepository.get(mpe.getMediaPackage().getIdentifier().toString(), mpe.getIdentifier());
2032                   return SmilUtil.loadSmilDocument(in, mpe);
2033                 } catch (Exception e) {
2034                   logger.warn("Unable to load smil document from catalog '{}'", mpe, e);
2035                   return Misc.chuck(e);
2036                 } finally {
2037                   IOUtils.closeQuietly(in);
2038                 }
2039               }
2040             });
2041   }
2042 
2043   /**
2044    * Adds a SMIL track by a mediapackage track to a SMIL document
2045    *
2046    * @param smilDocument
2047    *          the SMIL document to extend
2048    * @param track
2049    *          the mediapackage track
2050    * @param startTime
2051    *          the start time
2052    * @return the augmented SMIL document
2053    * @throws IngestException
2054    *           if the partial flavor type is not valid
2055    */
2056   private org.w3c.dom.Document addSmilTrack(org.w3c.dom.Document smilDocument, Track track, long startTime)
2057           throws IngestException {
2058     if (MediaPackageElements.PRESENTER_SOURCE.getType().equals(track.getFlavor().getType())) {
2059       return SmilUtil.addTrack(smilDocument, SmilUtil.TrackType.PRESENTER, track.hasVideo(), startTime,
2060               track.getDuration(), track.getURI(), track.getIdentifier());
2061     } else if (MediaPackageElements.PRESENTATION_SOURCE.getType().equals(track.getFlavor().getType())) {
2062       return SmilUtil.addTrack(smilDocument, SmilUtil.TrackType.PRESENTATION, track.hasVideo(), startTime,
2063               track.getDuration(), track.getURI(), track.getIdentifier());
2064     } else {
2065       logger.warn("Invalid partial flavor type {} of track {}", track.getFlavor(), track);
2066       throw new IngestException(
2067               "Invalid partial flavor type " + track.getFlavor().getType() + " of track " + track.getURI().toString());
2068     }
2069   }
2070 
2071   private MediaPackage checkForCASeries(MediaPackage mp, String seriesAppendName) {
2072     //Check for media package id and CA series appendix set
2073     if (mp == null || seriesAppendName == null) {
2074       logger.debug("No series name provided");
2075       return mp;
2076     }
2077 
2078     // Verify user is a CA by checking roles and captureAgentId
2079     User user = securityService.getUser();
2080     if (!user.hasRole(GLOBAL_ADMIN_ROLE) && !user.hasRole(GLOBAL_CAPTURE_AGENT_ROLE)) {
2081       logger.info("User '{}' is missing capture agent roles, won't apply CASeries", user.getUsername());
2082       return mp;
2083     }
2084     //Get capture agent name
2085     String captureAgentId = null;
2086     Catalog[] catalog = mp.getCatalogs(MediaPackageElementFlavor.flavor("dublincore", "episode"));
2087     if (catalog.length == 1) {
2088       try (InputStream catalogInputStream = workingFileRepository.get(mp.getIdentifier().toString(), catalog[0].getIdentifier())) {
2089         DublinCoreCatalog dc = dublinCoreService.load(catalogInputStream);
2090         captureAgentId = getCaptureAgent(dc);
2091       } catch (Exception e) {
2092         logger.info("Unable to determine capture agent name");
2093       }
2094     }
2095     if (captureAgentId == null) {
2096       logger.info("No Capture Agent ID defined for MediaPackage {}, won't apply CASeries", mp.getIdentifier());
2097       return mp;
2098     }
2099     logger.info("Applying CASeries to MediaPackage {} for capture agent '{}'", mp.getIdentifier(), captureAgentId);
2100 
2101     // Find or create CA series
2102     String seriesId = captureAgentId.replaceAll("[^\\w-_.:;()]+", "_");
2103     String seriesName = captureAgentId + seriesAppendName;
2104 
2105     try {
2106       seriesService.getSeries(seriesId);
2107     } catch (NotFoundException nfe) {
2108       try {
2109         List<String> roleNames = new ArrayList<>();
2110         String roleName = SecurityUtil.getCaptureAgentRole(captureAgentId);
2111         roleNames.add(roleName);
2112         logger.debug("Capture agent role name: {}", roleName);
2113 
2114         String username = user.getUsername();
2115         roleNames.add(UserIdRoleProvider.getUserIdRole(username));
2116 
2117         logger.info("Creating new series for capture agent '{}' and user '{}'", captureAgentId, username);
2118         createSeries(seriesId, seriesName, roleNames);
2119       } catch (Exception e) {
2120         logger.error("Unable to create series {} for event {}", seriesName, mp, e);
2121         return mp;
2122       }
2123     } catch (SeriesException | UnauthorizedException e) {
2124       logger.error("Exception while searching for series {}", seriesName, e);
2125       return mp;
2126     }
2127 
2128     // Add the event to CA series
2129     mp.setSeries(seriesId);
2130     mp.setSeriesTitle(seriesName);
2131 
2132     return mp;
2133   }
2134 
2135   private DublinCoreCatalog createSeries(String seriesId, String seriesName, List<String> roleNames)
2136       throws SeriesException, UnauthorizedException, NotFoundException {
2137     DublinCoreCatalog dc = DublinCores.mkOpencastSeries().getCatalog();
2138     dc.set(PROPERTY_IDENTIFIER, seriesId);
2139     dc.set(PROPERTY_TITLE, seriesName);
2140     dc.set(DublinCore.PROPERTY_CREATED, EncodingSchemeUtils.encodeDate(new Date(), Precision.Second));
2141     // set series name
2142     DublinCoreCatalog createdSeries = seriesService.updateSeries(dc);
2143 
2144     // fill acl
2145     List<AccessControlEntry> aces = new ArrayList();
2146     for (String roleName : roleNames) {
2147       AccessControlEntry aceRead = new AccessControlEntry(roleName, Permissions.Action.READ.toString(), true);
2148       AccessControlEntry aceWrite = new AccessControlEntry(roleName, Permissions.Action.WRITE.toString(), true);
2149       aces.add(aceRead);
2150       aces.add(aceWrite);
2151     }
2152     AccessControlList acl = new AccessControlList(aces);
2153     seriesService.updateAccessControl(seriesId, acl);
2154     logger.info("Created capture agent series with name {} and id {}", seriesName, seriesId);
2155 
2156     return dc;
2157   }
2158 }