View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.ingest.impl;
23  
24  import static org.apache.commons.lang3.StringUtils.isBlank;
25  import static org.opencastproject.metadata.dublincore.DublinCore.PROPERTY_IDENTIFIER;
26  import static org.opencastproject.metadata.dublincore.DublinCore.PROPERTY_TITLE;
27  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
28  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_CAPTURE_AGENT_ROLE;
29  import static org.opencastproject.util.JobUtil.waitForJob;
30  
31  import org.opencastproject.authorization.xacml.XACMLParsingException;
32  import org.opencastproject.authorization.xacml.XACMLUtils;
33  import org.opencastproject.capture.CaptureParameters;
34  import org.opencastproject.ingest.api.IngestException;
35  import org.opencastproject.ingest.api.IngestService;
36  import org.opencastproject.ingest.impl.jmx.IngestStatistics;
37  import org.opencastproject.inspection.api.MediaInspectionService;
38  import org.opencastproject.job.api.AbstractJobProducer;
39  import org.opencastproject.job.api.Job;
40  import org.opencastproject.job.api.Job.Status;
41  import org.opencastproject.mediapackage.Attachment;
42  import org.opencastproject.mediapackage.Catalog;
43  import org.opencastproject.mediapackage.EName;
44  import org.opencastproject.mediapackage.MediaPackage;
45  import org.opencastproject.mediapackage.MediaPackageBuilderFactory;
46  import org.opencastproject.mediapackage.MediaPackageElement;
47  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
48  import org.opencastproject.mediapackage.MediaPackageElementParser;
49  import org.opencastproject.mediapackage.MediaPackageElements;
50  import org.opencastproject.mediapackage.MediaPackageException;
51  import org.opencastproject.mediapackage.MediaPackageParser;
52  import org.opencastproject.mediapackage.MediaPackageSupport;
53  import org.opencastproject.mediapackage.Track;
54  import org.opencastproject.mediapackage.identifier.IdImpl;
55  import org.opencastproject.metadata.dublincore.DCMIPeriod;
56  import org.opencastproject.metadata.dublincore.DublinCore;
57  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
58  import org.opencastproject.metadata.dublincore.DublinCoreCatalogService;
59  import org.opencastproject.metadata.dublincore.DublinCoreValue;
60  import org.opencastproject.metadata.dublincore.DublinCores;
61  import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
62  import org.opencastproject.metadata.dublincore.Precision;
63  import org.opencastproject.scheduler.api.SchedulerException;
64  import org.opencastproject.scheduler.api.SchedulerService;
65  import org.opencastproject.security.api.AccessControlEntry;
66  import org.opencastproject.security.api.AccessControlList;
67  import org.opencastproject.security.api.OrganizationDirectoryService;
68  import org.opencastproject.security.api.Permissions;
69  import org.opencastproject.security.api.SecurityService;
70  import org.opencastproject.security.api.TrustedHttpClient;
71  import org.opencastproject.security.api.UnauthorizedException;
72  import org.opencastproject.security.api.User;
73  import org.opencastproject.security.api.UserDirectoryService;
74  import org.opencastproject.security.util.SecurityUtil;
75  import org.opencastproject.series.api.SeriesException;
76  import org.opencastproject.series.api.SeriesService;
77  import org.opencastproject.serviceregistry.api.ServiceRegistry;
78  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
79  import org.opencastproject.smil.api.util.SmilUtil;
80  import org.opencastproject.userdirectory.UserIdRoleProvider;
81  import org.opencastproject.util.ConfigurationException;
82  import org.opencastproject.util.IoSupport;
83  import org.opencastproject.util.LoadUtil;
84  import org.opencastproject.util.MimeTypes;
85  import org.opencastproject.util.NotFoundException;
86  import org.opencastproject.util.ProgressInputStream;
87  import org.opencastproject.util.XmlSafeParser;
88  import org.opencastproject.util.XmlUtil;
89  import org.opencastproject.util.data.functions.Misc;
90  import org.opencastproject.util.jmx.JmxUtil;
91  import org.opencastproject.workflow.api.WorkflowDatabaseException;
92  import org.opencastproject.workflow.api.WorkflowDefinition;
93  import org.opencastproject.workflow.api.WorkflowException;
94  import org.opencastproject.workflow.api.WorkflowInstance;
95  import org.opencastproject.workflow.api.WorkflowService;
96  import org.opencastproject.workingfilerepository.api.WorkingFileRepository;
97  
98  import com.google.common.cache.Cache;
99  import com.google.common.cache.CacheBuilder;
100 
101 import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
102 import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
103 import org.apache.commons.io.FilenameUtils;
104 import org.apache.commons.io.IOUtils;
105 import org.apache.commons.lang3.BooleanUtils;
106 import org.apache.commons.lang3.StringUtils;
107 import org.apache.cxf.jaxrs.ext.multipart.ContentDisposition;
108 import org.apache.http.Header;
109 import org.apache.http.HttpHeaders;
110 import org.apache.http.HttpResponse;
111 import org.apache.http.auth.AuthScope;
112 import org.apache.http.auth.UsernamePasswordCredentials;
113 import org.apache.http.client.CredentialsProvider;
114 import org.apache.http.client.config.AuthSchemes;
115 import org.apache.http.client.methods.HttpGet;
116 import org.apache.http.impl.client.BasicCredentialsProvider;
117 import org.apache.http.impl.client.CloseableHttpClient;
118 import org.apache.http.impl.client.HttpClientBuilder;
119 import org.osgi.service.cm.ManagedService;
120 import org.osgi.service.component.ComponentContext;
121 import org.osgi.service.component.annotations.Activate;
122 import org.osgi.service.component.annotations.Component;
123 import org.osgi.service.component.annotations.Deactivate;
124 import org.osgi.service.component.annotations.Reference;
125 import org.osgi.service.component.annotations.ReferenceCardinality;
126 import org.osgi.service.component.annotations.ReferencePolicy;
127 import org.slf4j.Logger;
128 import org.slf4j.LoggerFactory;
129 import org.xml.sax.SAXException;
130 
131 import java.beans.PropertyChangeEvent;
132 import java.beans.PropertyChangeListener;
133 import java.io.BufferedReader;
134 import java.io.IOException;
135 import java.io.InputStream;
136 import java.io.InputStreamReader;
137 import java.net.URI;
138 import java.nio.charset.StandardCharsets;
139 import java.util.ArrayList;
140 import java.util.Arrays;
141 import java.util.Base64;
142 import java.util.Date;
143 import java.util.Dictionary;
144 import java.util.HashMap;
145 import java.util.HashSet;
146 import java.util.List;
147 import java.util.Map;
148 import java.util.Map.Entry;
149 import java.util.Objects;
150 import java.util.Optional;
151 import java.util.Set;
152 import java.util.UUID;
153 import java.util.concurrent.TimeUnit;
154 import java.util.stream.Collectors;
155 
156 import javax.management.ObjectInstance;
157 
158 /**
159  * Creates and augments Opencast MediaPackages. Stores media into the Working File Repository.
160  */
161 @Component(
162   immediate = true,
163   service = {
164     IngestService.class,
165     ManagedService.class
166   },
167   property = {
168     "service.description=Ingest Service",
169     "service.pid=org.opencastproject.ingest.impl.IngestServiceImpl"
170   }
171 )
172 public class IngestServiceImpl extends AbstractJobProducer implements IngestService, ManagedService {
173 
174   /** The logger */
175   private static final Logger logger = LoggerFactory.getLogger(IngestServiceImpl.class);
176 
177   /** The source SMIL name */
178   private static final String PARTIAL_SMIL_NAME = "source_partial.smil";
179 
180   /** The configuration key that defines the default workflow definition */
181   protected static final String WORKFLOW_DEFINITION_DEFAULT = "org.opencastproject.workflow.default.definition";
182 
183   /** The workflow configuration property prefix **/
184   protected static final String WORKFLOW_CONFIGURATION_PREFIX = "org.opencastproject.workflow.config.";
185 
186   /** The key for the legacy mediapackage identifier */
187   public static final String LEGACY_MEDIAPACKAGE_ID_KEY = "org.opencastproject.ingest.legacy.mediapackage.id";
188 
189   public static final String JOB_TYPE = "org.opencastproject.ingest";
190 
191   /** Methods that ingest zips create jobs with this operation type */
192   public static final String INGEST_ZIP = "zip";
193 
194   /** Methods that ingest tracks directly create jobs with this operation type */
195   public static final String INGEST_TRACK = "track";
196 
197   /** Methods that ingest tracks from a URI create jobs with this operation type */
198   public static final String INGEST_TRACK_FROM_URI = "uri-track";
199 
200   /** Methods that ingest attachments directly create jobs with this operation type */
201   public static final String INGEST_ATTACHMENT = "attachment";
202 
203   /** Methods that ingest attachments from a URI create jobs with this operation type */
204   public static final String INGEST_ATTACHMENT_FROM_URI = "uri-attachment";
205 
206   /** Methods that ingest catalogs directly create jobs with this operation type */
207   public static final String INGEST_CATALOG = "catalog";
208 
209   /** Methods that ingest catalogs from a URI create jobs with this operation type */
210   public static final String INGEST_CATALOG_FROM_URI = "uri-catalog";
211 
212   /** The approximate load placed on the system by ingesting a file */
213   public static final float DEFAULT_INGEST_FILE_JOB_LOAD = 0.2f;
214 
215   /** The approximate load placed on the system by ingesting a zip file */
216   public static final float DEFAULT_INGEST_ZIP_JOB_LOAD = 0.2f;
217 
218   /** The key to look for in the service configuration file to override the {@link DEFAULT_INGEST_FILE_JOB_LOAD} */
219   public static final String FILE_JOB_LOAD_KEY = "job.load.ingest.file";
220 
221   /** The key to look for in the service configuration file to override the {@link DEFAULT_INGEST_ZIP_JOB_LOAD} */
222   public static final String ZIP_JOB_LOAD_KEY = "job.load.ingest.zip";
223 
224   /** The source to download from  */
225   public static final String DOWNLOAD_SOURCE = "org.opencastproject.download.source";
226 
227   /** The user for download from external sources */
228   public static final String DOWNLOAD_USER = "org.opencastproject.download.user";
229 
230   /** The password for download from external sources */
231   public static final String DOWNLOAD_PASSWORD = "org.opencastproject.download.password";
232 
233   /** The authentication method for download from external sources */
234   public static final String DOWNLOAD_AUTH_METHOD = "org.opencastproject.download.auth.method";
235 
236   /** Force basic authentication even if download host does not ask for it */
237   public static final String DOWNLOAD_AUTH_FORCE_BASIC = "org.opencastproject.download.auth.force_basic";
238 
239   /** By default, do not allow event ingest to modify existing series metadata */
240   public static final boolean DEFAULT_ALLOW_SERIES_MODIFICATIONS = false;
241 
242   /** The default is to preserve existing Opencast flavors during ingest. */
243   public static final boolean DEFAULT_ALLOW_ONLY_NEW_FLAVORS = true;
244 
245   /** The default is not to automatically skip attachments and catalogs from capture agent */
246   public static final boolean DEFAULT_SKIP = false;
247 
248   /** The default for force basic authentication even if download host does not ask for it */
249   public static final boolean DEFAULT_DOWNLOAD_AUTH_FORCE_BASIC = false;
250 
251   /** The maximum length of filenames ingested by Opencast */
252   public static final int FILENAME_LENGTH_MAX = 75;
253 
254   /** Managed Property key to allow Opencast series modification during ingest
255    * Deprecated, the param potentially causes an update chain reaction for all
256    * events associated to that series, for each ingest */
257   @Deprecated
258   public static final String MODIFY_OPENCAST_SERIES_KEY = "org.opencastproject.series.overwrite";
259 
260   /** Managed Property key to allow new flavors of ingested attachments and catalogs
261    * to be added to the existing Opencast mediapackage. But, not catalogs and attachments
262    * that would overwrite existing ones in Opencast.
263    */
264   public static final String ADD_ONLY_NEW_FLAVORS_KEY = "add.only.new.catalogs.attachments.for.existing.events";
265 
266   /** Control if catalogs sent by capture agents for scheduled events are skipped. */
267   public static final String SKIP_CATALOGS_KEY = "skip.catalogs.for.existing.events";
268 
269   /** Control if attachments sent by capture agents for scheduled events are skipped. */
270   public static final String SKIP_ATTACHMENTS_KEY = "skip.attachments.for.existing.events";
271 
272   /** The appendix added to autogenerated CA series. CA series creation deactivated if not configured */
273   private static final String SERIES_APPENDIX = "add.series.to.event.appendix";
274 
275   /** The approximate load placed on the system by ingesting a file */
276   private float ingestFileJobLoad = DEFAULT_INGEST_FILE_JOB_LOAD;
277 
278   /** The approximate load placed on the system by ingesting a zip file */
279   private float ingestZipJobLoad = DEFAULT_INGEST_ZIP_JOB_LOAD;
280 
281   /** The user for download from external sources */
282   private static String downloadUser = DOWNLOAD_USER;
283 
284   /** The password for download from external sources */
285   private static String downloadPassword = DOWNLOAD_PASSWORD;
286 
287   /** The authentication method for download from external sources */
288   private static String downloadAuthMethod = DOWNLOAD_AUTH_METHOD;
289 
290   /** Force basic authentication even if download host does not ask for it */
291   private static boolean downloadAuthForceBasic = DEFAULT_DOWNLOAD_AUTH_FORCE_BASIC;
292 
293   /** The external source dns name */
294   private static String downloadSource = DOWNLOAD_SOURCE;
295 
296   /** The JMX business object for ingest statistics */
297   private IngestStatistics ingestStatistics = new IngestStatistics();
298 
299   /** The JMX bean object instance */
300   private ObjectInstance registerMXBean;
301 
302   /** The workflow service */
303   private WorkflowService workflowService;
304 
305   /** The working file repository */
306   private WorkingFileRepository workingFileRepository;
307 
308   /** The http client */
309   private TrustedHttpClient httpClient;
310 
311   /** The series service */
312   private SeriesService seriesService;
313 
314   /** The dublin core service */
315   private DublinCoreCatalogService dublinCoreService;
316 
317   /** The opencast service registry */
318   private ServiceRegistry serviceRegistry;
319 
320   /** The security service */
321   protected SecurityService securityService = null;
322 
323   /** The user directory service */
324   protected UserDirectoryService userDirectoryService = null;
325 
326   /** The organization directory service */
327   protected OrganizationDirectoryService organizationDirectoryService = null;
328 
329   /** The scheduler service */
330   private SchedulerService schedulerService = null;
331 
332   /** The media inspection service */
333   private MediaInspectionService mediaInspectionService = null;
334 
335   /** The search index. */
336 
337   /** The default workflow identifier, if one is configured */
338   protected String defaultWorkflowDefinionId;
339 
340   /** The partial track start time map */
341   private Cache<String, Long> partialTrackStartTimes = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.DAYS)
342           .build();
343 
344   /** Option to overwrite matching flavors (e.g. series and episode metadata) on ingest,
345    *  tracks are always taken on ingest */
346   protected boolean isAddOnlyNew = DEFAULT_ALLOW_ONLY_NEW_FLAVORS;
347   protected boolean isAllowModifySeries = DEFAULT_ALLOW_SERIES_MODIFICATIONS;
348 
349   private boolean skipCatalogs = DEFAULT_SKIP;
350   private boolean skipAttachments = DEFAULT_SKIP;
351 
352   /** Create name appendix for autogenerated CA series */
353   private String createSeriesAppendix = null;
354 
355   protected boolean testMode = false;
356 
357   /**
358    * Creates a new ingest service instance.
359    */
360   public IngestServiceImpl() {
361     super(JOB_TYPE);
362   }
363 
364   /**
365    * OSGI callback for activating this component
366    *
367    * @param cc
368    *          the osgi component context
369    */
370   @Override
371   @Activate
372   public void activate(ComponentContext cc) {
373     super.activate(cc);
374     logger.info("Ingest Service started.");
375     defaultWorkflowDefinionId = StringUtils.trimToNull(cc.getBundleContext().getProperty(WORKFLOW_DEFINITION_DEFAULT));
376     if (defaultWorkflowDefinionId == null) {
377       defaultWorkflowDefinionId = "schedule-and-upload";
378     }
379     registerMXBean = JmxUtil.registerMXBean(ingestStatistics, "IngestStatistics");
380   }
381 
382   /**
383    * Callback from OSGi on service deactivation.
384    */
385   @Deactivate
386   public void deactivate() {
387     JmxUtil.unregisterMXBean(registerMXBean);
388   }
389 
390   /**
391    * {@inheritDoc}
392    *
393    * @see org.osgi.service.cm.ManagedService#updated(java.util.Dictionary)
394    *      Retrieve ManagedService configuration, including option to overwrite series
395    */
396   @Override
397   public void updated(Dictionary<String, ?> properties) throws ConfigurationException {
398 
399     if (properties == null) {
400       logger.info("No configuration available, using defaults");
401       return;
402     }
403 
404     downloadAuthMethod = StringUtils.trimToEmpty((String)properties.get(DOWNLOAD_AUTH_METHOD));
405     if (!"Digest".equals(downloadAuthMethod) && !"Basic".equals(downloadAuthMethod)) {
406       logger.warn("Download authentication method is neither Digest nor Basic; setting to Digest");
407       downloadAuthMethod = "Digest";
408     }
409     downloadAuthForceBasic = BooleanUtils.toBoolean(Objects.toString(properties.get(DOWNLOAD_AUTH_FORCE_BASIC),
410         BooleanUtils.toStringTrueFalse(DEFAULT_DOWNLOAD_AUTH_FORCE_BASIC)));
411     downloadPassword = StringUtils.trimToEmpty((String)properties.get(DOWNLOAD_PASSWORD));
412     downloadUser = StringUtils.trimToEmpty(((String) properties.get(DOWNLOAD_USER)));
413     downloadSource = StringUtils.trimToEmpty(((String) properties.get(DOWNLOAD_SOURCE)));
414     if (!isBlank(downloadSource) && (isBlank(downloadUser) || isBlank(downloadPassword))) {
415       logger.warn("Configured ingest download source has no configured user or password; deactivating authenticated"
416           + "download");
417       downloadSource = "";
418     }
419 
420     skipAttachments = BooleanUtils.toBoolean(Objects.toString(properties.get(SKIP_ATTACHMENTS_KEY),
421             BooleanUtils.toStringTrueFalse(DEFAULT_SKIP)));
422     skipCatalogs = BooleanUtils.toBoolean(Objects.toString(properties.get(SKIP_CATALOGS_KEY),
423             BooleanUtils.toStringTrueFalse(DEFAULT_SKIP)));
424     logger.debug("Skip attachments sent by agents for scheduled events: {}", skipAttachments);
425     logger.debug("Skip metadata catalogs sent by agents for scheduled events: {}", skipCatalogs);
426 
427     ingestFileJobLoad = LoadUtil.getConfiguredLoadValue(properties, FILE_JOB_LOAD_KEY, DEFAULT_INGEST_FILE_JOB_LOAD,
428             serviceRegistry);
429     ingestZipJobLoad = LoadUtil.getConfiguredLoadValue(properties, ZIP_JOB_LOAD_KEY, DEFAULT_INGEST_ZIP_JOB_LOAD,
430             serviceRegistry);
431 
432     isAllowModifySeries = BooleanUtils.toBoolean(Objects.toString(properties.get(MODIFY_OPENCAST_SERIES_KEY),
433               BooleanUtils.toStringTrueFalse(DEFAULT_ALLOW_SERIES_MODIFICATIONS)));
434     isAddOnlyNew = BooleanUtils.toBoolean(Objects.toString(properties.get(ADD_ONLY_NEW_FLAVORS_KEY),
435             BooleanUtils.toStringTrueFalse(DEFAULT_ALLOW_ONLY_NEW_FLAVORS)));
436     logger.info("Only allow new flavored catalogs and attachments on ingest:'{}'", isAddOnlyNew);
437     logger.info("Allowing series modification:'{}'", isAllowModifySeries);
438     createSeriesAppendix = StringUtils.trimToNull(((String) properties.get(SERIES_APPENDIX)));
439   }
440 
441   /**
442    * Sets the trusted http client
443    *
444    * @param httpClient
445    *          the http client
446    */
447   @Reference
448   public void setHttpClient(TrustedHttpClient httpClient) {
449     this.httpClient = httpClient;
450   }
451 
452   /**
453    * Sets the service registry
454    *
455    * @param serviceRegistry
456    *          the serviceRegistry to set
457    */
458   @Reference
459   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
460     this.serviceRegistry = serviceRegistry;
461   }
462 
463   /**
464    * Sets the media inspection service
465    *
466    * @param mediaInspectionService
467    *          the media inspection service to set
468    */
469   @Reference
470   public void setMediaInspectionService(MediaInspectionService mediaInspectionService) {
471     this.mediaInspectionService = mediaInspectionService;
472   }
473 
474   public WorkflowInstance addZippedMediaPackage(InputStream zipStream)
475           throws IngestException, IOException, MediaPackageException {
476     try {
477       return addZippedMediaPackage(zipStream, null, null);
478     } catch (NotFoundException e) {
479       throw new IllegalStateException("A not found exception was thrown without a lookup");
480     }
481   }
482 
483   @Override
484   public WorkflowInstance addZippedMediaPackage(InputStream zipStream, String wd, Map<String, String> workflowConfig)
485           throws MediaPackageException, IOException, IngestException, NotFoundException {
486     try {
487       return addZippedMediaPackage(zipStream, wd, workflowConfig, null);
488     } catch (UnauthorizedException e) {
489       throw new IllegalStateException(e);
490     }
491   }
492 
493   /**
494    * {@inheritDoc}
495    *
496    * @see org.opencastproject.ingest.api.IngestService#addZippedMediaPackage(java.io.InputStream, java.lang.String,
497    *      java.util.Map, java.lang.Long)
498    */
499   @Override
500   public WorkflowInstance addZippedMediaPackage(InputStream zipStream, String workflowDefinitionId,
501           Map<String, String> workflowConfig, Long workflowInstanceId)
502           throws MediaPackageException, IOException, IngestException, NotFoundException, UnauthorizedException {
503     // Start a job synchronously. We can't keep the open input stream waiting around.
504     Job job = null;
505 
506     if (StringUtils.isNotBlank(workflowDefinitionId)) {
507       try {
508         workflowService.getWorkflowDefinitionById(workflowDefinitionId);
509       } catch (WorkflowDatabaseException e) {
510         throw new IngestException(e);
511       } catch (NotFoundException nfe) {
512         logger.warn("Workflow definition {} not found, using default workflow {} instead", workflowDefinitionId,
513                 defaultWorkflowDefinionId);
514         workflowDefinitionId = defaultWorkflowDefinionId;
515       }
516     }
517 
518     if (workflowInstanceId != null) {
519       logger.warn("Deprecated method! Ingesting zipped mediapackage with workflow {}", workflowInstanceId);
520     } else {
521       logger.info("Ingesting zipped mediapackage");
522     }
523 
524     ZipArchiveInputStream zis = null;
525     Set<String> collectionFilenames = new HashSet<>();
526     try {
527       // We don't need anybody to do the dispatching for us. Therefore we need to make sure that the job is never in
528       // QUEUED state but set it to INSTANTIATED in the beginning and then manually switch it to RUNNING.
529       job = serviceRegistry.createJob(JOB_TYPE, INGEST_ZIP, null, null, false, ingestZipJobLoad);
530       job.setStatus(Status.RUNNING);
531       job = serviceRegistry.updateJob(job);
532 
533       // Create the working file target collection for this ingest operation
534       String wfrCollectionId = Long.toString(job.getId());
535 
536       zis = new ZipArchiveInputStream(zipStream);
537       ZipArchiveEntry entry;
538       MediaPackage mp = null;
539       Map<String, URI> uris = new HashMap<>();
540       // Sequential number to append to file names so that, if two files have the same
541       // name, one does not overwrite the other (see MH-9688)
542       int seq = 1;
543       // Folder name to compare with next one to figure out if there's a root folder
544       String folderName = null;
545       // Indicates if zip has a root folder or not, initialized as true
546       boolean hasRootFolder = true;
547       // While there are entries write them to a collection
548       while ((entry = zis.getNextZipEntry()) != null) {
549         try {
550           if (entry.isDirectory() || entry.getName().contains("__MACOSX"))
551             continue;
552 
553           if (entry.getName().endsWith("manifest.xml") || entry.getName().endsWith("index.xml")) {
554             // Build the media package
555             final InputStream is = new ZipEntryInputStream(zis, entry.getSize());
556             mp = MediaPackageParser.getFromXml(IOUtils.toString(is, StandardCharsets.UTF_8));
557           } else {
558             logger.info("Storing zip entry {}/{} in working file repository collection '{}'", job.getId(),
559                     entry.getName(), wfrCollectionId);
560             // Since the directory structure is not being mirrored, makes sure the file
561             // name is different than the previous one(s) by adding a sequential number
562             String fileName = FilenameUtils.getBaseName(entry.getName()) + "_" + seq++ + "."
563                     + FilenameUtils.getExtension(entry.getName());
564             URI contentUri = workingFileRepository.putInCollection(wfrCollectionId, fileName,
565                     new ZipEntryInputStream(zis, entry.getSize()));
566             collectionFilenames.add(fileName);
567             // Key is the zip entry name as it is
568             String key = entry.getName();
569             uris.put(key, contentUri);
570             ingestStatistics.add(entry.getSize());
571             logger.info("Zip entry {}/{} stored at {}", job.getId(), entry.getName(), contentUri);
572             // Figures out if there's a root folder. Does entry name starts with a folder?
573             int pos = entry.getName().indexOf('/');
574             if (pos == -1) {
575               // No, we can conclude there's no root folder
576               hasRootFolder = false;
577             } else if (hasRootFolder && folderName != null && !folderName.equals(entry.getName().substring(0, pos))) {
578               // Folder name different from previous so there's no root folder
579               hasRootFolder = false;
580             } else if (folderName == null) {
581               // Just initialize folder name
582               folderName = entry.getName().substring(0, pos);
583             }
584           }
585         } catch (IOException e) {
586           logger.warn("Unable to process zip entry {}", entry.getName(), e);
587           throw e;
588         }
589       }
590 
591       if (mp == null)
592         throw new MediaPackageException("No manifest found in this zip");
593 
594       // Determine the mediapackage identifier
595       if (mp.getIdentifier() == null || isBlank(mp.getIdentifier().toString()))
596         mp.setIdentifier(IdImpl.fromUUID());
597 
598       String mediaPackageId = mp.getIdentifier().toString();
599 
600       logger.info("Ingesting mediapackage {} is named '{}'", mediaPackageId, mp.getTitle());
601 
602       // Make sure there are tracks in the mediapackage
603       if (mp.getTracks().length == 0) {
604         logger.warn("Mediapackage {} has no media tracks", mediaPackageId);
605       }
606 
607       // Update the element uris to point to their working file repository location
608       for (MediaPackageElement element : mp.elements()) {
609         // Key has root folder name if there is one
610         URI uri = uris.get((hasRootFolder ? folderName + "/" : "") + element.getURI().toString());
611 
612         if (uri == null)
613           throw new MediaPackageException("Unable to map element name '" + element.getURI() + "' to workspace uri");
614         logger.info("Ingested mediapackage element {}/{} located at {}", mediaPackageId, element.getIdentifier(), uri);
615         URI dest = workingFileRepository.moveTo(wfrCollectionId, FilenameUtils.getName(uri.toString()), mediaPackageId,
616                 element.getIdentifier(), FilenameUtils.getName(element.getURI().toString()));
617         element.setURI(dest);
618       }
619 
620       // Now that all elements are in place, start with ingest
621       logger.info("Initiating processing of ingested mediapackage {}", mediaPackageId);
622       WorkflowInstance workflowInstance = ingest(mp, workflowDefinitionId, workflowConfig, workflowInstanceId);
623       logger.info("Ingest of mediapackage {} done", mediaPackageId);
624       job.setStatus(Job.Status.FINISHED);
625       return workflowInstance;
626     } catch (ServiceRegistryException e) {
627       throw new IngestException(e);
628     } catch (MediaPackageException e) {
629       job.setStatus(Job.Status.FAILED, Job.FailureReason.DATA);
630       throw e;
631     } catch (Exception e) {
632       if (e instanceof IngestException)
633         throw (IngestException) e;
634       throw new IngestException(e);
635     } finally {
636       IOUtils.closeQuietly(zis);
637       finallyUpdateJob(job);
638       for (String filename : collectionFilenames) {
639         workingFileRepository.deleteFromCollection(Long.toString(job.getId()), filename, true);
640       }
641     }
642   }
643 
644   /**
645    * {@inheritDoc}
646    *
647    * @see org.opencastproject.ingest.api.IngestService#createMediaPackage()
648    */
649   @Override
650   public MediaPackage createMediaPackage() throws MediaPackageException, ConfigurationException {
651     MediaPackage mediaPackage;
652     try {
653       mediaPackage = MediaPackageBuilderFactory.newInstance().newMediaPackageBuilder().createNew();
654     } catch (MediaPackageException e) {
655       logger.error("INGEST:Failed to create media package " + e.getLocalizedMessage());
656       throw e;
657     }
658     mediaPackage.setDate(new Date());
659     logger.info("Created mediapackage {}", mediaPackage);
660     return mediaPackage;
661   }
662 
663   /**
664    * {@inheritDoc}
665    *
666    * @see org.opencastproject.ingest.api.IngestService#createMediaPackage()
667    */
668   @Override
669   public MediaPackage createMediaPackage(String mediaPackageId)
670           throws MediaPackageException, ConfigurationException {
671     MediaPackage mediaPackage;
672     try {
673       mediaPackage = MediaPackageBuilderFactory.newInstance().newMediaPackageBuilder()
674               .createNew(new IdImpl(mediaPackageId));
675     } catch (MediaPackageException e) {
676       logger.error("INGEST:Failed to create media package " + e.getLocalizedMessage());
677       throw e;
678     }
679     mediaPackage.setDate(new Date());
680     logger.info("Created mediapackage {}", mediaPackage);
681     return mediaPackage;
682   }
683 
684   /**
685    * {@inheritDoc}
686    *
687    * @see org.opencastproject.ingest.api.IngestService#addTrack(java.net.URI,
688    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, String[] ,
689    *      org.opencastproject.mediapackage.MediaPackage)
690    */
691   @Override
692   public MediaPackage addTrack(URI uri, MediaPackageElementFlavor flavor, String[] tags, MediaPackage mediaPackage)
693           throws IOException, IngestException {
694     Job job = null;
695     try {
696       job = serviceRegistry
697               .createJob(
698                       JOB_TYPE, INGEST_TRACK_FROM_URI, Arrays.asList(uri.toString(),
699                               flavor == null ? null : flavor.toString(), MediaPackageParser.getAsXml(mediaPackage)),
700                       null, false, ingestFileJobLoad);
701       job.setStatus(Status.RUNNING);
702       job = serviceRegistry.updateJob(job);
703       String elementId = UUID.randomUUID().toString();
704       logger.info("Start adding track {} from URL {} on mediapackage {}", elementId, uri, mediaPackage);
705       URI newUrl = addContentToRepo(mediaPackage, elementId, uri);
706       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Track,
707               flavor);
708       if (tags != null && tags.length > 0) {
709         MediaPackageElement trackElement = mp.getTrack(elementId);
710         for (String tag : tags) {
711           logger.info("Adding tag: " + tag + " to Element: " + elementId);
712           trackElement.addTag(tag);
713         }
714       }
715 
716       job.setStatus(Job.Status.FINISHED);
717       logger.info("Successful added track {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
718       return mp;
719     } catch (IOException e) {
720       throw e;
721     } catch (ServiceRegistryException e) {
722       throw new IngestException(e);
723     } catch (NotFoundException e) {
724       throw new IngestException("Unable to update ingest job", e);
725     } finally {
726       finallyUpdateJob(job);
727     }
728   }
729 
730   /**
731    * {@inheritDoc}
732    *
733    * @see org.opencastproject.ingest.api.IngestService#addTrack(java.io.InputStream, java.lang.String,
734    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
735    */
736   @Override
737   public MediaPackage addTrack(InputStream in, String fileName, MediaPackageElementFlavor flavor,
738           MediaPackage mediaPackage) throws IOException, IngestException {
739     String[] tags = null;
740     return this.addTrack(in, fileName, flavor, tags, mediaPackage);
741   }
742 
743   /**
744    * {@inheritDoc}
745    *
746    * @see org.opencastproject.ingest.api.IngestService#addTrack(java.io.InputStream, java.lang.String,
747    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
748    */
749   @Override
750   public MediaPackage addTrack(InputStream in, String fileName, MediaPackageElementFlavor flavor, String[] tags,
751           MediaPackage mediaPackage) throws IOException, IngestException {
752     Job job = null;
753     try {
754       job = serviceRegistry.createJob(JOB_TYPE, INGEST_TRACK, null, null, false, ingestFileJobLoad);
755       job.setStatus(Status.RUNNING);
756       job = serviceRegistry.updateJob(job);
757       String elementId = UUID.randomUUID().toString();
758       logger.info("Start adding track {} from input stream on mediapackage {}", elementId, mediaPackage);
759       if (fileName.length() > FILENAME_LENGTH_MAX) {
760         final String extension = "." + FilenameUtils.getExtension(fileName);
761         final int length = Math.max(0, FILENAME_LENGTH_MAX - extension.length());
762         fileName = fileName.substring(0, length) + extension;
763       }
764       URI newUrl = addContentToRepo(mediaPackage, elementId, fileName, in);
765       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Track,
766               flavor);
767       if (tags != null && tags.length > 0) {
768         MediaPackageElement trackElement = mp.getTrack(elementId);
769         for (String tag : tags) {
770           logger.debug("Adding tag `{}` to element {}", tag, elementId);
771           trackElement.addTag(tag);
772         }
773       }
774 
775       job.setStatus(Job.Status.FINISHED);
776       logger.info("Successful added track {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
777       return mp;
778     } catch (IOException e) {
779       throw e;
780     } catch (ServiceRegistryException e) {
781       throw new IngestException(e);
782     } catch (NotFoundException e) {
783       throw new IngestException("Unable to update ingest job", e);
784     } finally {
785       finallyUpdateJob(job);
786     }
787   }
788 
789   @Override
790   public MediaPackage addPartialTrack(URI uri, MediaPackageElementFlavor flavor, long startTime,
791           MediaPackage mediaPackage) throws IOException, IngestException {
792     Job job = null;
793     try {
794       job = serviceRegistry.createJob(
795               JOB_TYPE,
796               INGEST_TRACK_FROM_URI,
797               Arrays.asList(uri.toString(), flavor == null ? null : flavor.toString(),
798                       MediaPackageParser.getAsXml(mediaPackage)), null, false);
799       job.setStatus(Status.RUNNING);
800       job = serviceRegistry.updateJob(job);
801       String elementId = UUID.randomUUID().toString();
802       logger.info("Start adding partial track {} from URL {} on mediapackage {}", elementId, uri, mediaPackage);
803       URI newUrl = addContentToRepo(mediaPackage, elementId, uri);
804       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Track,
805               flavor);
806       job.setStatus(Job.Status.FINISHED);
807       // store startTime
808       partialTrackStartTimes.put(elementId, startTime);
809       logger.debug("Added start time {} for track {}", startTime, elementId);
810       logger.info("Successful added partial track {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
811       return mp;
812     } catch (ServiceRegistryException e) {
813       throw new IngestException(e);
814     } catch (NotFoundException e) {
815       throw new IngestException("Unable to update ingest job", e);
816     } finally {
817       finallyUpdateJob(job);
818     }
819   }
820 
821   @Override
822   public MediaPackage addPartialTrack(InputStream in, String fileName, MediaPackageElementFlavor flavor, long startTime,
823           MediaPackage mediaPackage) throws IOException, IngestException {
824     Job job = null;
825     try {
826       job = serviceRegistry.createJob(JOB_TYPE, INGEST_TRACK, null, null, false);
827       job.setStatus(Status.RUNNING);
828       job = serviceRegistry.updateJob(job);
829       String elementId = UUID.randomUUID().toString();
830       logger.info("Start adding partial track {} from input stream on mediapackage {}", elementId, mediaPackage);
831       URI newUrl = addContentToRepo(mediaPackage, elementId, fileName, in);
832       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Track,
833               flavor);
834       job.setStatus(Job.Status.FINISHED);
835       // store startTime
836       partialTrackStartTimes.put(elementId, startTime);
837       logger.debug("Added start time {} for track {}", startTime, elementId);
838       logger.info("Successful added partial track {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
839       return mp;
840     } catch (ServiceRegistryException e) {
841       throw new IngestException(e);
842     } catch (NotFoundException e) {
843       throw new IngestException("Unable to update ingest job", e);
844     } finally {
845       finallyUpdateJob(job);
846     }
847   }
848 
849   /**
850    * {@inheritDoc}
851    *
852    * @see org.opencastproject.ingest.api.IngestService#addCatalog(java.net.URI,
853    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, String[], org.opencastproject.mediapackage.MediaPackage)
854    */
855   @Override
856   public MediaPackage addCatalog(URI uri, MediaPackageElementFlavor flavor, String[] tags, MediaPackage mediaPackage)
857           throws IOException, IngestException {
858     Job job = null;
859     try {
860       job = serviceRegistry.createJob(JOB_TYPE, INGEST_CATALOG_FROM_URI,
861               Arrays.asList(uri.toString(), flavor.toString(), MediaPackageParser.getAsXml(mediaPackage)), null, false,
862               ingestFileJobLoad);
863       job.setStatus(Status.RUNNING);
864       job = serviceRegistry.updateJob(job);
865       String elementId = UUID.randomUUID().toString();
866       logger.info("Start adding catalog {} from URL {} on mediapackage {}", elementId, uri, mediaPackage);
867       URI newUrl = addContentToRepo(mediaPackage, elementId, uri);
868       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Catalog,
869               flavor);
870       if (tags != null && tags.length > 0) {
871         MediaPackageElement catalogElement = mp.getCatalog(elementId);
872         for (String tag : tags) {
873           logger.info("Adding tag: " + tag + " to Element: " + elementId);
874           catalogElement.addTag(tag);
875         }
876       }
877       job.setStatus(Job.Status.FINISHED);
878       logger.info("Successful added catalog {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
879       return mp;
880     } catch (ServiceRegistryException e) {
881       throw new IngestException(e);
882     } catch (NotFoundException e) {
883       throw new IngestException("Unable to update ingest job", e);
884     } finally {
885       finallyUpdateJob(job);
886     }
887   }
888 
889   /**
890    * Updates the persistent representation of a series based on a potentially modified dublin core document.
891    *
892    * @param mediaPackage
893    *         the media package containing series metadata and ACLs.
894    * @return
895    *         true, if the series is created or overwritten, false if the existing series remains intact.
896    * @throws IOException if the series catalog was not found
897    * @throws IngestException if any other exception was encountered
898    */
899   protected boolean updateSeries(MediaPackage mediaPackage) throws IOException, IngestException {
900     Catalog[] seriesCatalogs = mediaPackage.getCatalogs(MediaPackageElements.SERIES);
901     if (seriesCatalogs.length == 0) {
902       return false;
903     } else if (seriesCatalogs.length > 1) {
904       logger.warn("Mediapackage {} has more than one series dublincore catalogs. Using catalog {} with ID {}.",
905           mediaPackage.getIdentifier(), seriesCatalogs[0].getURI(), seriesCatalogs[0].getIdentifier());
906     }
907     // Parse series dublincore
908     HttpResponse response = null;
909     InputStream in = null;
910     boolean isUpdated = false;
911     boolean isNew = false;
912     String seriesId = null;
913     try {
914       HttpGet getDc = new HttpGet(seriesCatalogs[0].getURI());
915       response = httpClient.execute(getDc);
916       in = response.getEntity().getContent();
917       DublinCoreCatalog dc = dublinCoreService.load(in);
918       seriesId = dc.getFirst(DublinCore.PROPERTY_IDENTIFIER);
919       if (seriesId == null) {
920         logger.warn("Series dublin core document contains no identifier, "
921             + "rejecting ingested series catalog for mediapackage {}.", mediaPackage.getIdentifier());
922       } else {
923         try {
924           try {
925             seriesService.getSeries(seriesId);
926             if (isAllowModifySeries) {
927               // Update existing series
928               seriesService.updateSeries(dc);
929               isUpdated = true;
930               logger.debug("Ingest is overwriting the existing series {} with the ingested series", seriesId);
931             } else {
932               logger.debug("Series {} already exists. Ignoring series catalog from ingest.", seriesId);
933             }
934           } catch (NotFoundException e) {
935             logger.info("Creating new series {} with default ACL.", seriesId);
936             seriesService.updateSeries(dc);
937             isUpdated = true;
938             isNew = true;
939           }
940 
941         } catch (Exception e) {
942           throw new IngestException(e);
943         }
944       }
945       in.close();
946     } catch (IOException e) {
947       logger.error("Error updating series from DublinCoreCatalog.}", e);
948     } finally {
949       IOUtils.closeQuietly(in);
950       httpClient.close(response);
951     }
952     if (!isUpdated) {
953       return isUpdated;
954     }
955     // Apply series extended metadata
956     for (MediaPackageElement seriesElement : mediaPackage.getElementsByFlavor(
957         MediaPackageElementFlavor.parseFlavor("*/series"))) {
958       if (MediaPackageElement.Type.Catalog == seriesElement.getElementType()
959           && !MediaPackageElements.SERIES.equals(seriesElement.getFlavor())) {
960         String catalogType = seriesElement.getFlavor().getType();
961         logger.info("Apply series {} metadata catalog from mediapackage {} to newly created series {}.",
962             catalogType, mediaPackage.getIdentifier(), seriesId);
963         byte[] data;
964         try {
965           HttpGet getExtendedMetadata = new HttpGet(seriesElement.getURI());
966           response = httpClient.execute(getExtendedMetadata);
967           in = response.getEntity().getContent();
968           data = IOUtils.readFully(in, (int) response.getEntity().getContentLength());
969         } catch (Exception e) {
970           throw new IngestException("Unable to read series " + catalogType + " metadata catalog for series "
971               + seriesId + ".", e);
972         } finally {
973           IOUtils.closeQuietly(in);
974           httpClient.close(response);
975         }
976         try {
977           seriesService.updateSeriesElement(seriesId, catalogType, data);
978         } catch (SeriesException e) {
979           throw new IngestException(
980               "Unable to update series " + catalogType + " catalog on newly created series " + seriesId + ".", e);
981         }
982       }
983     }
984     if (isNew) {
985       logger.info("Apply series ACL from mediapackage {} to newly created series {}.",
986           mediaPackage.getIdentifier(), seriesId);
987       Attachment[] seriesXacmls = mediaPackage.getAttachments(MediaPackageElements.XACML_POLICY_SERIES);
988       if (seriesXacmls.length > 0) {
989         if (seriesXacmls.length > 1) {
990           logger.warn("Mediapackage {} has more than one series xacml attachments. Using {}.",
991               mediaPackage.getIdentifier(), seriesXacmls[0].getURI());
992         }
993         AccessControlList seriesAcl = null;
994         try {
995           HttpGet getXacml = new HttpGet(seriesXacmls[0].getURI());
996           response = httpClient.execute(getXacml);
997           in = response.getEntity().getContent();
998           seriesAcl = XACMLUtils.parseXacml(in);
999         } catch (XACMLParsingException ex) {
1000           throw new IngestException("Unable to parse series xacml from mediapackage "
1001               + mediaPackage.getIdentifier() + ".", ex);
1002         } catch (IOException e) {
1003           logger.error("Error updating series {} ACL from mediapackage {}.",
1004               seriesId, mediaPackage.getIdentifier(), e);
1005           throw e;
1006         } finally {
1007           IOUtils.closeQuietly(in);
1008           httpClient.close(response);
1009         }
1010         try {
1011           seriesService.updateAccessControl(seriesId, seriesAcl);
1012         } catch (Exception e) {
1013           throw new IngestException("Unable to update series ACL on newly created series " + seriesId + ".", e);
1014         }
1015       }
1016     }
1017     return isUpdated;
1018   }
1019 
1020   /**
1021    * {@inheritDoc}
1022    *
1023    * @see org.opencastproject.ingest.api.IngestService#addCatalog(java.io.InputStream, java.lang.String,
1024    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
1025    */
1026   @Override
1027   public MediaPackage addCatalog(InputStream in, String fileName, MediaPackageElementFlavor flavor,
1028           MediaPackage mediaPackage) throws IOException, IngestException {
1029     return addCatalog(in, fileName, flavor, null, mediaPackage);
1030   }
1031 
1032   /**
1033    * {@inheritDoc}
1034    *
1035    * @see org.opencastproject.ingest.api.IngestService#addCatalog(java.io.InputStream, java.lang.String,
1036    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
1037    */
1038   @Override
1039   public MediaPackage addCatalog(InputStream in, String fileName, MediaPackageElementFlavor flavor, String[] tags,
1040           MediaPackage mediaPackage) throws IOException, IngestException, IllegalArgumentException {
1041     Job job = null;
1042     try {
1043       job = serviceRegistry.createJob(JOB_TYPE, INGEST_CATALOG, null, null, false, ingestFileJobLoad);
1044       job.setStatus(Status.RUNNING);
1045       job = serviceRegistry.updateJob(job);
1046       final String elementId = UUID.randomUUID().toString();
1047       final String mediaPackageId = mediaPackage.getIdentifier().toString();
1048       logger.info("Start adding catalog {} from input stream on mediapackage {}", elementId, mediaPackageId);
1049       final URI newUrl = addContentToRepo(mediaPackage, elementId, fileName, in);
1050 
1051       final boolean isJSON;
1052       try (InputStream inputStream = workingFileRepository.get(mediaPackageId, elementId)) {
1053         try (BufferedReader reader  = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
1054           // Exception for current BBB integration and Extron SMP351 which is ingesting a JSON array/object as catalog
1055           int firstChar = reader.read();
1056           isJSON = firstChar == '[' || firstChar == '{';
1057         }
1058       }
1059 
1060       if (isJSON) {
1061         logger.warn("Input catalog seems to be JSON. This is a mistake and will fail in future Opencast versions."
1062             + "You will likely want to ingest this as a media package attachment instead.");
1063       } else {
1064         // Verify XML is not corrupted
1065         try {
1066           XmlSafeParser.parse(workingFileRepository.get(mediaPackageId, elementId));
1067         } catch (SAXException e) {
1068           workingFileRepository.delete(mediaPackageId, elementId);
1069           throw new IllegalArgumentException("Catalog XML is invalid", e);
1070         }
1071       }
1072       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Catalog,
1073               flavor);
1074       if (tags != null && tags.length > 0) {
1075         MediaPackageElement trackElement = mp.getCatalog(elementId);
1076         for (String tag : tags) {
1077           logger.info("Adding tag {} to element {}", tag, elementId);
1078           trackElement.addTag(tag);
1079         }
1080       }
1081 
1082       job.setStatus(Job.Status.FINISHED);
1083       logger.info("Successful added catalog {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
1084       return mp;
1085     } catch (ServiceRegistryException e) {
1086       throw new IngestException(e);
1087     } catch (NotFoundException e) {
1088       throw new IngestException("Unable to update ingest job", e);
1089     } finally {
1090       finallyUpdateJob(job);
1091     }
1092   }
1093 
1094   /**
1095    * {@inheritDoc}
1096    *
1097    * @see org.opencastproject.ingest.api.IngestService#addAttachment(java.net.URI,
1098    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, String[], org.opencastproject.mediapackage.MediaPackage)
1099    */
1100   @Override
1101   public MediaPackage addAttachment(URI uri, MediaPackageElementFlavor flavor, String[] tags, MediaPackage mediaPackage)
1102           throws IOException, IngestException {
1103     Job job = null;
1104     try {
1105       job = serviceRegistry.createJob(JOB_TYPE, INGEST_ATTACHMENT_FROM_URI,
1106               Arrays.asList(uri.toString(), flavor.toString(), MediaPackageParser.getAsXml(mediaPackage)), null, false,
1107               ingestFileJobLoad);
1108       job.setStatus(Status.RUNNING);
1109       job = serviceRegistry.updateJob(job);
1110       String elementId = UUID.randomUUID().toString();
1111       logger.info("Start adding attachment {} from URL {} on mediapackage {}", elementId, uri, mediaPackage);
1112       URI newUrl = addContentToRepo(mediaPackage, elementId, uri);
1113       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Attachment,
1114               flavor);
1115       if (tags != null && tags.length > 0) {
1116         MediaPackageElement attachmentElement = mp.getAttachment(elementId);
1117         for (String tag : tags) {
1118           logger.debug("Adding tag: " + tag + " to Element: " + elementId);
1119           attachmentElement.addTag(tag);
1120         }
1121       }
1122       job.setStatus(Job.Status.FINISHED);
1123       logger.info("Successful added attachment {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
1124       return mp;
1125     } catch (ServiceRegistryException e) {
1126       throw new IngestException(e);
1127     } catch (NotFoundException e) {
1128       throw new IngestException("Unable to update ingest job", e);
1129     } finally {
1130       finallyUpdateJob(job);
1131     }
1132   }
1133 
1134   /**
1135    * {@inheritDoc}
1136    *
1137    * @see org.opencastproject.ingest.api.IngestService#addAttachment(java.io.InputStream, java.lang.String,
1138    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
1139    */
1140   @Override
1141   public MediaPackage addAttachment(InputStream in, String fileName, MediaPackageElementFlavor flavor, String[] tags,
1142           MediaPackage mediaPackage) throws IOException, IngestException {
1143     Job job = null;
1144     try {
1145       job = serviceRegistry.createJob(JOB_TYPE, INGEST_ATTACHMENT, null, null, false, ingestFileJobLoad);
1146       job.setStatus(Status.RUNNING);
1147       job = serviceRegistry.updateJob(job);
1148       String elementId = UUID.randomUUID().toString();
1149       logger.info("Start adding attachment {} from input stream on mediapackage {}", elementId, mediaPackage);
1150       URI newUrl = addContentToRepo(mediaPackage, elementId, fileName, in);
1151       MediaPackage mp = addContentToMediaPackage(mediaPackage, elementId, newUrl, MediaPackageElement.Type.Attachment,
1152               flavor);
1153       if (tags != null && tags.length > 0) {
1154         MediaPackageElement trackElement = mp.getAttachment(elementId);
1155         for (String tag : tags) {
1156           logger.info("Adding tag: " + tag + " to Element: " + elementId);
1157           trackElement.addTag(tag);
1158         }
1159       }
1160       job.setStatus(Job.Status.FINISHED);
1161       logger.info("Successful added attachment {} on mediapackage {} at URL {}", elementId, mediaPackage, newUrl);
1162       return mp;
1163     } catch (ServiceRegistryException e) {
1164       throw new IngestException(e);
1165     } catch (NotFoundException e) {
1166       throw new IngestException("Unable to update ingest job", e);
1167     } finally {
1168       finallyUpdateJob(job);
1169     }
1170 
1171   }
1172 
1173   /**
1174    * {@inheritDoc}
1175    *
1176    * @see org.opencastproject.ingest.api.IngestService#addAttachment(java.io.InputStream, java.lang.String,
1177    *      org.opencastproject.mediapackage.MediaPackageElementFlavor, org.opencastproject.mediapackage.MediaPackage)
1178    */
1179   @Override
1180   public MediaPackage addAttachment(InputStream in, String fileName, MediaPackageElementFlavor flavor,
1181           MediaPackage mediaPackage) throws IOException, IngestException {
1182     String[] tags = null;
1183     return addAttachment(in, fileName, flavor, tags, mediaPackage);
1184   }
1185 
1186   /**
1187    *
1188    * {@inheritDoc}
1189    *
1190    * @see org.opencastproject.ingest.api.IngestService#ingest(org.opencastproject.mediapackage.MediaPackage)
1191    */
1192   @Override
1193   public WorkflowInstance ingest(MediaPackage mp) throws IngestException {
1194     try {
1195       return ingest(mp, null, null, null);
1196     } catch (NotFoundException e) {
1197       throw new IngestException(e);
1198     } catch (UnauthorizedException e) {
1199       throw new IllegalStateException(e);
1200     }
1201   }
1202 
1203   /**
1204    * {@inheritDoc}
1205    *
1206    * @see org.opencastproject.ingest.api.IngestService#ingest(org.opencastproject.mediapackage.MediaPackage,
1207    *      java.lang.String, java.util.Map)
1208    */
1209   @Override
1210   public WorkflowInstance ingest(MediaPackage mp, String wd, Map<String, String> properties)
1211           throws IngestException, NotFoundException {
1212     try {
1213       return ingest(mp, wd, properties, null);
1214     } catch (UnauthorizedException e) {
1215       throw new IllegalStateException(e);
1216     }
1217   }
1218 
1219   /**
1220    * {@inheritDoc}
1221    *
1222    * @see org.opencastproject.ingest.api.IngestService#ingest(org.opencastproject.mediapackage.MediaPackage,
1223    *      java.lang.String, java.util.Map, java.lang.Long)
1224    */
1225   @Override
1226   public WorkflowInstance ingest(MediaPackage mp, String workflowDefinitionId, Map<String, String> properties,
1227           Long workflowInstanceId) throws IngestException, NotFoundException, UnauthorizedException {
1228     // Check for legacy media package id
1229     mp = checkForLegacyMediaPackageId(mp, properties);
1230 
1231     try {
1232       mp = createSmil(mp);
1233     } catch (IOException e) {
1234       throw new IngestException("Unable to add SMIL Catalog", e);
1235     }
1236 
1237     try {
1238       updateSeries(mp);
1239     } catch (IOException e) {
1240       throw new IngestException("Unable to create or update series from mediapackage " + mp.getIdentifier() + ".", e);
1241     }
1242 
1243     // Done, update the job status and return the created workflow instance
1244     if (workflowInstanceId != null) {
1245       logger.warn(
1246               "Resuming workflow {} with ingested mediapackage {} is deprecated, skip resuming and start new workflow",
1247               workflowInstanceId, mp);
1248     }
1249 
1250     if (workflowDefinitionId == null) {
1251       logger.info("Starting a new workflow with ingested mediapackage {} based on the default workflow definition '{}'",
1252               mp, defaultWorkflowDefinionId);
1253     } else {
1254       logger.info("Starting a new workflow with ingested mediapackage {} based on workflow definition '{}'", mp,
1255               workflowDefinitionId);
1256     }
1257 
1258     try {
1259       // Determine the workflow definition
1260       WorkflowDefinition workflowDef = getWorkflowDefinition(workflowDefinitionId, mp);
1261 
1262       // Get the final set of workflow properties
1263       properties = mergeWorkflowConfiguration(properties, mp.getIdentifier().toString());
1264 
1265       // Remove potential workflow configuration prefixes from the workflow properties
1266       properties = removePrefixFromProperties(properties);
1267 
1268       // Merge scheduled mediapackage with ingested
1269       mp = mergeScheduledMediaPackage(mp);
1270       if (mp.getSeries() == null) {
1271         mp = checkForCASeries(mp, createSeriesAppendix);
1272       }
1273 
1274 
1275       ingestStatistics.successful();
1276       if (workflowDef != null) {
1277         logger.info("Starting new workflow with ingested mediapackage '{}' using the specified template '{}'",
1278                 mp.getIdentifier().toString(), workflowDefinitionId);
1279       } else {
1280         logger.info("Starting new workflow with ingested mediapackage '{}' using the default template '{}'",
1281                 mp.getIdentifier().toString(), defaultWorkflowDefinionId);
1282       }
1283       return workflowService.start(workflowDef, mp, properties);
1284     } catch (WorkflowException e) {
1285       ingestStatistics.failed();
1286       throw new IngestException(e);
1287     }
1288   }
1289 
1290   @Override
1291   public void schedule(MediaPackage mediaPackage, String workflowDefinitionID, Map<String, String> properties)
1292           throws IllegalStateException, IngestException, NotFoundException, UnauthorizedException, SchedulerException {
1293     MediaPackageElement[] mediaPackageElements = mediaPackage.getElementsByFlavor(MediaPackageElements.EPISODE);
1294     if (mediaPackageElements.length != 1) {
1295       logger.debug("There can be only one (and exactly one) episode dublin core catalog: https://youtu.be/_J3VeogFUOs");
1296       throw new IngestException("There can be only one (and exactly one) episode dublin core catalog");
1297     }
1298     InputStream inputStream;
1299     DublinCoreCatalog dublinCoreCatalog;
1300     try {
1301       inputStream = workingFileRepository.get(mediaPackage.getIdentifier().toString(),
1302               mediaPackageElements[0].getIdentifier());
1303       dublinCoreCatalog = dublinCoreService.load(inputStream);
1304     } catch (IOException e) {
1305       throw new IngestException(e);
1306     }
1307 
1308     EName temporal = new EName(DublinCore.TERMS_NS_URI, "temporal");
1309     List<DublinCoreValue> periods = dublinCoreCatalog.get(temporal);
1310     if (periods.size() != 1) {
1311       logger.debug("There can be only one (and exactly one) period");
1312       throw new IngestException("There can be only one (and exactly one) period");
1313     }
1314     DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(periods.get(0));
1315     if (!period.hasStart() || !period.hasEnd()) {
1316       logger.debug("A scheduled recording needs to have a start and end.");
1317       throw new IngestException("A scheduled recording needs to have a start and end.");
1318     }
1319     EName createdEName = new EName(DublinCore.TERMS_NS_URI, "created");
1320     List<DublinCoreValue> created = dublinCoreCatalog.get(createdEName);
1321     if (created.size() == 0) {
1322       logger.debug("Created not set");
1323     } else if (created.size() == 1) {
1324       Date date = EncodingSchemeUtils.decodeMandatoryDate(created.get(0));
1325       if (date.getTime() != period.getStart().getTime()) {
1326         logger.debug("start and created date differ ({} vs {})", date.getTime(), period.getStart().getTime());
1327         throw new IngestException("Temporal start and created date differ");
1328       }
1329     } else {
1330       logger.debug("There can be only one created date");
1331       throw new IngestException("There can be only one created date");
1332     }
1333     String captureAgent = getCaptureAgent(dublinCoreCatalog);
1334 
1335     // Go through properties
1336     Map<String, String> agentProperties = new HashMap<>();
1337     Map<String, String> workflowProperties = new HashMap<>();
1338     for (String key : properties.keySet()) {
1339       if (key.startsWith("org.opencastproject.workflow.config.")) {
1340         workflowProperties.put(key, properties.get(key));
1341       } else {
1342         agentProperties.put(key, properties.get(key));
1343       }
1344     }
1345 
1346     // Remove workflow configuration prefixes from the workflow properties
1347     workflowProperties = removePrefixFromProperties(workflowProperties);
1348 
1349     try {
1350       schedulerService.addEvent(period.getStart(), period.getEnd(), captureAgent, new HashSet<>(), mediaPackage,
1351               workflowProperties, agentProperties, Optional.empty());
1352     } finally {
1353       for (MediaPackageElement mediaPackageElement : mediaPackage.getElements()) {
1354         try {
1355           workingFileRepository.delete(mediaPackage.getIdentifier().toString(), mediaPackageElement.getIdentifier());
1356         } catch (IOException e) {
1357           logger.warn("Failed to delete media package element", e);
1358         }
1359       }
1360     }
1361   }
1362 
1363     private String getCaptureAgent(DublinCoreCatalog dublinCoreCatalog) throws IngestException {
1364     // spatial
1365     EName spatial = new EName(DublinCore.TERMS_NS_URI, "spatial");
1366     List<DublinCoreValue> captureAgents = dublinCoreCatalog.get(spatial);
1367     if (captureAgents.size() != 1) {
1368       logger.debug("Exactly one capture agent needs to be set");
1369       throw new IngestException("Exactly one capture agent needs to be set");
1370     }
1371     return captureAgents.get(0).getValue();
1372   }
1373 
1374   /**
1375    * Check whether the mediapackage id is set via the legacy workflow identifier and change the id if existing.
1376    *
1377    * @param mp
1378    *          the mediapackage
1379    * @param properties
1380    *          the workflow properties
1381    * @return the mediapackage
1382    */
1383   private MediaPackage checkForLegacyMediaPackageId(MediaPackage mp, Map<String, String> properties)
1384           throws IngestException {
1385     if (properties == null || properties.isEmpty())
1386       return mp;
1387 
1388     try {
1389       String mediaPackageId = properties.get(LEGACY_MEDIAPACKAGE_ID_KEY);
1390       if (StringUtils.isNotBlank(mediaPackageId) && schedulerService != null) {
1391         logger.debug("Check ingested mediapackage {} for legacy mediapackage identifier {}",
1392                 mp.getIdentifier().toString(), mediaPackageId);
1393         try {
1394           schedulerService.getMediaPackage(mp.getIdentifier().toString());
1395           return mp;
1396         } catch (NotFoundException e) {
1397           logger.info("No scheduler mediapackage found with ingested id {}, try legacy mediapackage id {}",
1398                   mp.getIdentifier().toString(), mediaPackageId);
1399           try {
1400             schedulerService.getMediaPackage(mediaPackageId);
1401             logger.info("Legacy mediapackage id {} exists, change ingested mediapackage id {} to legacy id",
1402                     mediaPackageId, mp.getIdentifier().toString());
1403             mp.setIdentifier(new IdImpl(mediaPackageId));
1404             return mp;
1405           } catch (NotFoundException e1) {
1406             logger.info("No scheduler mediapackage found with legacy mediapackage id {}, skip merging", mediaPackageId);
1407           } catch (Exception e1) {
1408             logger.error("Unable to get event mediapackage from scheduler event {}", mediaPackageId, e);
1409             throw new IngestException(e);
1410           }
1411         } catch (Exception e) {
1412           logger.error("Unable to get event mediapackage from scheduler event {}", mp.getIdentifier().toString(), e);
1413           throw new IngestException(e);
1414         }
1415       }
1416       return mp;
1417     } finally {
1418       properties.remove(LEGACY_MEDIAPACKAGE_ID_KEY);
1419     }
1420   }
1421 
1422   private Map<String, String> mergeWorkflowConfiguration(Map<String, String> properties, String mediaPackageId) {
1423     if (isBlank(mediaPackageId) || schedulerService == null)
1424       return properties;
1425 
1426     HashMap<String, String> mergedProperties = new HashMap<>();
1427 
1428     try {
1429       Map<String, String> recordingProperties = schedulerService.getCaptureAgentConfiguration(mediaPackageId);
1430       logger.debug("Restoring workflow properties from scheduler event {}", mediaPackageId);
1431       mergedProperties.putAll(recordingProperties);
1432     } catch (SchedulerException e) {
1433       logger.warn("Unable to get workflow properties from scheduler event {}", mediaPackageId, e);
1434     } catch (NotFoundException e) {
1435       logger.info("No capture event found for id {}", mediaPackageId);
1436     } catch (UnauthorizedException e) {
1437       throw new IllegalStateException(e);
1438     }
1439 
1440     if (properties != null) {
1441       // Merge the properties, this must be after adding the recording properties
1442       logger.debug("Merge workflow properties with the one from the scheduler event {}", mediaPackageId);
1443       mergedProperties.putAll(properties);
1444     }
1445 
1446     return mergedProperties;
1447   }
1448 
1449   /**
1450    * Merges the ingested mediapackage with the scheduled mediapackage. The ingested mediapackage takes precedence over
1451    * the scheduled mediapackage.
1452    *
1453    * @param mp
1454    *          the ingested mediapackage
1455    * @return the merged mediapackage
1456    */
1457   private MediaPackage mergeScheduledMediaPackage(MediaPackage mp) throws IngestException {
1458     if (schedulerService == null) {
1459       logger.warn("No scheduler service available to merge mediapackage!");
1460       return mp;
1461     }
1462 
1463     try {
1464       MediaPackage scheduledMp = schedulerService.getMediaPackage(mp.getIdentifier().toString());
1465       logger.info("Found matching scheduled event for id '{}', merging mediapackage...", mp.getIdentifier().toString());
1466       mergeMediaPackageElements(mp, scheduledMp);
1467       mergeMediaPackageMetadata(mp, scheduledMp);
1468       return mp;
1469     } catch (NotFoundException e) {
1470       logger.debug("No scheduler mediapackage found with id {}, skip merging", mp.getIdentifier());
1471       return mp;
1472     } catch (Exception e) {
1473       throw new IngestException(String.format("Unable to get event media package from scheduler event %s",
1474               mp.getIdentifier()), e);
1475     }
1476   }
1477 
1478   /**
1479    * Merge different elements from capture agent ingesting mp and Asset manager. Overwrite or replace same flavored
1480    * elements depending on the Ingest Service overwrite configuration. Ignore publications (i.e. live publication
1481    * channel from Asset Manager) Always keep tracks from the capture agent.
1482    *
1483    * @param mp
1484    *          the medipackage being ingested from the Capture Agent
1485    * @param scheduledMp
1486    *          the mediapckage that was schedule and managed by the Asset Manager
1487    */
1488   private void mergeMediaPackageElements(MediaPackage mp, MediaPackage scheduledMp) {
1489     // drop catalogs sent by the capture agent in favor of Opencast's own metadata
1490     if (skipCatalogs) {
1491       for (MediaPackageElement element : mp.getCatalogs()) {
1492         if (!element.getFlavor().equals(MediaPackageElements.SMIL)) {
1493           mp.remove(element);
1494         }
1495       }
1496     }
1497 
1498     // drop attachments the capture agent sent us in favor of Opencast's attachments
1499     // e.g. prevent capture agents from modifying security rules of schedules events
1500     if (skipAttachments) {
1501       for (MediaPackageElement element : mp.getAttachments()) {
1502         mp.remove(element);
1503       }
1504     }
1505 
1506     for (MediaPackageElement element : scheduledMp.getElements()) {
1507       if (MediaPackageElement.Type.Publication.equals(element.getElementType())) {
1508         // The Asset managed media package may have a publication element for a live event, if retract live has not run yet.
1509         // Publications do not have flavors and are never part of the mediapackage from the capture agent.
1510         // Therefore, ignore publication element because it is removed when the recorded media is published and causes complications (on short media) if added.
1511         logger.debug("Ignoring {}, not adding to ingested mediapackage {}", MediaPackageElement.Type.Publication, mp);
1512         continue;
1513       } else if (mp.getElementsByFlavor(element.getFlavor()).length > 0) {
1514         // The default is to overwrite matching flavored elements in the Asset managed mediapackage (e.g. catalogs)
1515         // If isOverwrite is true, changes made from the CA overwrite (update/revert) changes made from the Admin UI.
1516         // If isOverwrite is false, changes made from the CA do not overwrite (update/revert) changes made from the Admin UI.
1517         // regardless of overwrite, always keep new ingested tracks.
1518         if (!isAddOnlyNew || MediaPackageElement.Type.Track.equals(element.getElementType())) {
1519           // Allow updates made from the Capture Agent to overwrite existing metadata in Opencast
1520           logger.info(
1521                   "Omitting Opencast (Asset Managed) element '{}', replacing with ingested element of same flavor '{}'",
1522                   element,
1523                   element.getFlavor());
1524           continue;
1525         }
1526         // Remove flavored element from ingested mp and replaced it with maching element from Asset Managed mediapackage.
1527         // This protects updates made from the admin UI during an event capture from being reverted by artifacts from the ingested CA.
1528         for (MediaPackageElement el : mp.getElementsByFlavor(element.getFlavor())) {
1529           logger.info("Omitting ingested element '{}' {}, keeping existing (Asset Managed) element of same flavor '{}'", el, el.getURI(),
1530                   element.getFlavor());
1531           mp.remove(el);
1532         }
1533       }
1534       logger.info("Adding element {} from scheduled (Asset Managed) event '{}' into ingested mediapackage", element, mp);
1535       mp.add(element);
1536     }
1537   }
1538 
1539   /**
1540    *
1541    * The previous OC behaviour is for metadata in the ingested mediapackage to be updated by the
1542    * Asset Managed metadata *only* when the field is blank on the ingested mediapackage.
1543    * However, that field may have been intentionally emptied by
1544    * removing its value from the Capture Agent UI (e.g. Galicaster)
1545    *
1546    * If isOverwrite is true, metadata values in the ingest mediapackage overwrite Asset Managed metadata.
1547    * If isOverwrite is false, Asset Managed metadata is preserved.
1548    *
1549    * @param mp,
1550    *          the inbound ingested mp
1551    * @param scheduledMp,
1552    *          the existing scheduled mp
1553    */
1554   private void mergeMediaPackageMetadata(MediaPackage mp, MediaPackage scheduledMp) {
1555     // Merge media package fields depending on overwrite setting
1556     boolean noOverwrite = (isAddOnlyNew && !skipCatalogs) || skipCatalogs;
1557     if ((mp.getDate() == null) || noOverwrite)
1558       mp.setDate(scheduledMp.getDate());
1559     if (isBlank(mp.getLicense()) || noOverwrite)
1560       mp.setLicense(scheduledMp.getLicense());
1561     if (isBlank(mp.getSeries()) || noOverwrite)
1562       mp.setSeries(scheduledMp.getSeries());
1563     if (isBlank(mp.getSeriesTitle()) || noOverwrite)
1564       mp.setSeriesTitle(scheduledMp.getSeriesTitle());
1565     if (isBlank(mp.getTitle()) || noOverwrite)
1566       mp.setTitle(scheduledMp.getTitle());
1567 
1568     if (mp.getSubjects().length <= 0 || noOverwrite) {
1569       Arrays.stream(mp.getSubjects()).forEach(mp::removeSubject);
1570       for (String subject : scheduledMp.getSubjects()) {
1571         mp.addSubject(subject);
1572       }
1573     }
1574     if (noOverwrite || mp.getContributors().length == 0) {
1575       Arrays.stream(mp.getContributors()).forEach(mp::removeContributor);
1576       for (String contributor : scheduledMp.getContributors()) {
1577         mp.addContributor(contributor);
1578       }
1579     }
1580     if (noOverwrite || mp.getCreators().length == 0) {
1581       Arrays.stream(mp.getCreators()).forEach(mp::removeCreator);
1582       for (String creator : scheduledMp.getCreators()) {
1583         mp.addCreator(creator);
1584       }
1585     }
1586   }
1587 
1588   /**
1589    * Removes the workflow configuration file prefix from all properties in a map.
1590    *
1591    * @param properties
1592    *          The properties to remove the prefixes from
1593    * @return A Map with the same collection of properties without the prefix
1594    */
1595   private Map<String, String> removePrefixFromProperties(Map<String, String> properties) {
1596     Map<String, String> fixedProperties = new HashMap<>();
1597     if (properties != null) {
1598       for (Entry<String, String> entry : properties.entrySet()) {
1599         if (entry.getKey().startsWith(WORKFLOW_CONFIGURATION_PREFIX)) {
1600           logger.debug("Removing prefix from key '" + entry.getKey() + " with value '" + entry.getValue() + "'");
1601           fixedProperties.put(entry.getKey().replace(WORKFLOW_CONFIGURATION_PREFIX, ""), entry.getValue());
1602         } else {
1603           fixedProperties.put(entry.getKey(), entry.getValue());
1604         }
1605       }
1606     }
1607     return fixedProperties;
1608   }
1609 
1610   private WorkflowDefinition getWorkflowDefinition(String workflowDefinitionID, MediaPackage mediapackage)
1611           throws NotFoundException, WorkflowDatabaseException, IngestException {
1612     // If the workflow definition and instance ID are null, use the default, or throw if there is none
1613     if (isBlank(workflowDefinitionID)) {
1614       String mediaPackageId = mediapackage.getIdentifier().toString();
1615       if (schedulerService != null) {
1616         logger.info("Determining workflow template for ingested mediapckage {} from capture event {}", mediapackage,
1617                 mediaPackageId);
1618         try {
1619           Map<String, String> recordingProperties = schedulerService.getCaptureAgentConfiguration(mediaPackageId);
1620           workflowDefinitionID = recordingProperties.get(CaptureParameters.INGEST_WORKFLOW_DEFINITION);
1621           if (isBlank(workflowDefinitionID)) {
1622             workflowDefinitionID = defaultWorkflowDefinionId;
1623             logger.debug("No workflow set. Falling back to default.");
1624           }
1625           if (isBlank(workflowDefinitionID)) {
1626             throw new IngestException("No value found for key '" + CaptureParameters.INGEST_WORKFLOW_DEFINITION
1627                     + "' from capture event configuration of scheduler event '" + mediaPackageId + "'");
1628           }
1629           logger.info("Ingested event {} will be processed using workflow '{}'", mediapackage, workflowDefinitionID);
1630         } catch (NotFoundException e) {
1631           logger.warn("Specified capture event {} was not found", mediaPackageId);
1632         } catch (UnauthorizedException e) {
1633           throw new IllegalStateException(e);
1634         } catch (SchedulerException e) {
1635           logger.warn("Unable to get the workflow definition id from scheduler event {}", mediaPackageId, e);
1636           throw new IngestException(e);
1637         }
1638       } else {
1639         logger.warn(
1640                 "Scheduler service not bound, unable to determine the workflow template to use for ingested mediapckage {}",
1641                 mediapackage);
1642       }
1643 
1644     } else {
1645       logger.info("Ingested mediapackage {} is processed using workflow template '{}', specified during ingest",
1646               mediapackage, workflowDefinitionID);
1647     }
1648 
1649     // Use the default workflow definition if nothing was determined
1650     if (isBlank(workflowDefinitionID) && defaultWorkflowDefinionId != null) {
1651       logger.info("Using default workflow definition '{}' to process ingested mediapackage {}",
1652               defaultWorkflowDefinionId, mediapackage);
1653       workflowDefinitionID = defaultWorkflowDefinionId;
1654     }
1655 
1656     // Check if the workflow definition is valid
1657     if (StringUtils.isNotBlank(workflowDefinitionID) && StringUtils.isNotBlank(defaultWorkflowDefinionId)) {
1658       try {
1659         workflowService.getWorkflowDefinitionById(workflowDefinitionID);
1660       } catch (WorkflowDatabaseException e) {
1661         throw new IngestException(e);
1662       } catch (NotFoundException nfe) {
1663         logger.warn("Workflow definition {} not found, using default workflow {} instead", workflowDefinitionID,
1664                 defaultWorkflowDefinionId);
1665         workflowDefinitionID = defaultWorkflowDefinionId;
1666       }
1667     }
1668 
1669     // Have we been able to find a workflow definition id?
1670     if (isBlank(workflowDefinitionID)) {
1671       ingestStatistics.failed();
1672       throw new IllegalStateException(
1673               "Can not ingest a workflow without a workflow definition or an existing instance. No default definition is specified");
1674     }
1675 
1676     // Let's make sure the workflow definition exists
1677     return workflowService.getWorkflowDefinitionById(workflowDefinitionID);
1678   }
1679 
1680   /**
1681    *
1682    * {@inheritDoc}
1683    *
1684    * @see org.opencastproject.ingest.api.IngestService#discardMediaPackage(org.opencastproject.mediapackage.MediaPackage)
1685    */
1686   @Override
1687   public void discardMediaPackage(MediaPackage mp) throws IOException {
1688     String mediaPackageId = mp.getIdentifier().toString();
1689     for (MediaPackageElement element : mp.getElements()) {
1690       if (!workingFileRepository.delete(mediaPackageId, element.getIdentifier()))
1691         logger.warn("Unable to find (and hence, delete), this mediapackage element");
1692     }
1693     logger.info("Successfully discarded media package {}", mp);
1694   }
1695 
1696   protected URI addContentToRepo(MediaPackage mp, String elementId, URI uri) throws IOException {
1697     InputStream in = null;
1698     HttpResponse response = null;
1699     CloseableHttpClient externalHttpClient = null;
1700     try {
1701       if (uri.toString().startsWith("http")) {
1702         HttpGet get = new HttpGet(uri);
1703 
1704         if (!isBlank(downloadSource) && uri.toString().matches(downloadSource)) {
1705           // NB: We're creating a new client here with *different* auth than the system auth creds
1706           externalHttpClient = getAuthedHttpClient();
1707           get.setHeader("X-Requested-Auth", downloadAuthMethod);
1708           if ("Basic".equals(downloadAuthMethod) && downloadAuthForceBasic) {
1709             String auth = downloadUser + ":" + downloadPassword;
1710             byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes(StandardCharsets.ISO_8859_1));
1711             String authHeader = "Basic " + new String(encodedAuth);
1712             get.setHeader(HttpHeaders.AUTHORIZATION, authHeader);
1713           }
1714           response = externalHttpClient.execute(get);
1715         } else {
1716           // httpClient checks internally to see if it should be sending the default auth, or not.
1717           response = httpClient.execute(get);
1718         }
1719 
1720         if (null == response) {
1721           // If you get here then chances are you're using a mock httpClient which does not have appropriate
1722           // mocking to respond to the URL you are feeding it.  Try adding that URL to the mock and see if that works.
1723           throw new IOException("Null response object from the http client, refer to code for explanation");
1724         }
1725 
1726         int httpStatusCode = response.getStatusLine().getStatusCode();
1727         if (httpStatusCode != 200) {
1728           throw new IOException(uri + " returns http " + httpStatusCode);
1729         }
1730         in = response.getEntity().getContent();
1731         //If it does not start with file, or we're in test mode (ie, to allow arbitrary file:// access)
1732       } else if (!uri.toString().startsWith("file") || testMode) {
1733         in = uri.toURL().openStream();
1734       } else {
1735         throw new IOException("Refusing to fetch files from the local filesystem");
1736       }
1737       String fileName = FilenameUtils.getName(uri.getPath());
1738       if (isBlank(FilenameUtils.getExtension(fileName)))
1739         fileName = getContentDispositionFileName(response);
1740 
1741       if (isBlank(FilenameUtils.getExtension(fileName)))
1742         throw new IOException("No filename extension found: " + fileName);
1743       return addContentToRepo(mp, elementId, fileName, in);
1744     } finally {
1745       if (in != null) {
1746         in.close();
1747       }
1748       if (externalHttpClient != null) {
1749         externalHttpClient.close();
1750       }
1751       httpClient.close(response);
1752     }
1753   }
1754 
1755   private String getContentDispositionFileName(HttpResponse response) {
1756     if (response == null)
1757       return null;
1758 
1759     Header header = response.getFirstHeader("Content-Disposition");
1760     ContentDisposition contentDisposition = new ContentDisposition(header.getValue());
1761     return contentDisposition.getParameter("filename");
1762   }
1763 
1764   private URI addContentToRepo(MediaPackage mp, String elementId, String filename, InputStream file)
1765           throws IOException {
1766     ProgressInputStream progressInputStream = new ProgressInputStream(file);
1767     progressInputStream.addPropertyChangeListener(new PropertyChangeListener() {
1768       @Override
1769       public void propertyChange(PropertyChangeEvent evt) {
1770         long totalNumBytesRead = (Long) evt.getNewValue();
1771         long oldTotalNumBytesRead = (Long) evt.getOldValue();
1772         ingestStatistics.add(totalNumBytesRead - oldTotalNumBytesRead);
1773       }
1774     });
1775     return workingFileRepository.put(mp.getIdentifier().toString(), elementId, filename, progressInputStream);
1776   }
1777 
1778   private MediaPackage addContentToMediaPackage(MediaPackage mp, String elementId, URI uri,
1779           MediaPackageElement.Type type, MediaPackageElementFlavor flavor) {
1780     logger.info("Adding element of type {} to mediapackage {}", type, mp);
1781     MediaPackageElement mpe = mp.add(uri, type, flavor);
1782     mpe.setIdentifier(elementId);
1783     return mp;
1784   }
1785 
1786   // ---------------------------------------------
1787   // --------- bind and unbind bundles ---------
1788   // ---------------------------------------------
1789   @Reference
1790   public void setWorkflowService(WorkflowService workflowService) {
1791     this.workflowService = workflowService;
1792   }
1793 
1794   @Reference
1795   public void setWorkingFileRepository(WorkingFileRepository workingFileRepository) {
1796     this.workingFileRepository = workingFileRepository;
1797   }
1798 
1799   @Reference
1800   public void setSeriesService(SeriesService seriesService) {
1801     this.seriesService = seriesService;
1802   }
1803 
1804   @Reference
1805   public void setDublinCoreService(DublinCoreCatalogService dublinCoreService) {
1806     this.dublinCoreService = dublinCoreService;
1807   }
1808 
1809   /**
1810    * {@inheritDoc}
1811    *
1812    * @see org.opencastproject.job.api.AbstractJobProducer#getServiceRegistry()
1813    */
1814   @Override
1815   protected ServiceRegistry getServiceRegistry() {
1816     return serviceRegistry;
1817   }
1818 
1819   /**
1820    * {@inheritDoc}
1821    *
1822    * @see org.opencastproject.job.api.AbstractJobProducer#process(org.opencastproject.job.api.Job)
1823    */
1824   @Override
1825   protected String process(Job job) throws Exception {
1826     throw new IllegalStateException("Ingest jobs are not expected to be dispatched");
1827   }
1828 
1829   /**
1830    * Callback for setting the security service.
1831    *
1832    * @param securityService
1833    *          the securityService to set
1834    */
1835   @Reference
1836   public void setSecurityService(SecurityService securityService) {
1837     this.securityService = securityService;
1838   }
1839 
1840   /**
1841    * Callback for setting the user directory service.
1842    *
1843    * @param userDirectoryService
1844    *          the userDirectoryService to set
1845    */
1846   @Reference
1847   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1848     this.userDirectoryService = userDirectoryService;
1849   }
1850 
1851   /**
1852    * Callback for setting the scheduler service.
1853    *
1854    * @param schedulerService
1855    *          the scheduler service to set
1856    */
1857   @Reference(
1858     policy = ReferencePolicy.DYNAMIC,
1859     cardinality = ReferenceCardinality.OPTIONAL,
1860     unbind = "unsetSchedulerService"
1861   )
1862   public void setSchedulerService(SchedulerService schedulerService) {
1863     this.schedulerService = schedulerService;
1864   }
1865 
1866   public void unsetSchedulerService(SchedulerService schedulerService) {
1867     if (this.schedulerService == schedulerService) {
1868       this.schedulerService = null;
1869     }
1870   }
1871 
1872   /**
1873    * Sets a reference to the organization directory service.
1874    *
1875    * @param organizationDirectory
1876    *          the organization directory
1877    */
1878   @Reference
1879   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
1880     organizationDirectoryService = organizationDirectory;
1881   }
1882 
1883   /**
1884    * {@inheritDoc}
1885    *
1886    * @see org.opencastproject.job.api.AbstractJobProducer#getSecurityService()
1887    */
1888   @Override
1889   protected SecurityService getSecurityService() {
1890     return securityService;
1891   }
1892 
1893   /**
1894    * {@inheritDoc}
1895    *
1896    * @see org.opencastproject.job.api.AbstractJobProducer#getUserDirectoryService()
1897    */
1898   @Override
1899   protected UserDirectoryService getUserDirectoryService() {
1900     return userDirectoryService;
1901   }
1902 
1903   /**
1904    * {@inheritDoc}
1905    *
1906    * @see org.opencastproject.job.api.AbstractJobProducer#getOrganizationDirectoryService()
1907    */
1908   @Override
1909   protected OrganizationDirectoryService getOrganizationDirectoryService() {
1910     return organizationDirectoryService;
1911   }
1912 
1913   protected CloseableHttpClient getAuthedHttpClient() {
1914     HttpClientBuilder cb = HttpClientBuilder.create();
1915     CredentialsProvider provider = new BasicCredentialsProvider();
1916     String schema = AuthSchemes.DIGEST;
1917     if ("Basic".equals(downloadAuthMethod)) {
1918       schema = AuthSchemes.BASIC;
1919     }
1920     provider.setCredentials(
1921       new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, AuthScope.ANY_REALM, schema),
1922       new UsernamePasswordCredentials(downloadUser, downloadPassword));
1923     return cb.setDefaultCredentialsProvider(provider).build();
1924   }
1925 
1926   private MediaPackage createSmil(MediaPackage mediaPackage) throws IOException, IngestException {
1927     List<Track> partialTracks = new ArrayList<>();
1928     for (Track track : mediaPackage.getTracks()) {
1929       Long startTime = partialTrackStartTimes.getIfPresent(track.getIdentifier());
1930       if (startTime != null) {
1931         partialTracks.add(track);
1932       }
1933     }
1934 
1935     // No partial track available return without adding SMIL catalog
1936     if (partialTracks.isEmpty()) {
1937       return mediaPackage;
1938     }
1939 
1940     // Inspect the partial tracks
1941     List<Track> tracks = partialTracks.stream()
1942         .map(track -> {
1943           try {
1944             // Create a media inspection job for a mediapackage element.
1945             return mediaInspectionService.enrich(track, true);
1946           } catch (Exception e) {
1947             throw new RuntimeException("Error enriching track", e);
1948           }
1949         })
1950         .map(job -> {
1951           try {
1952             // Interpret the payload of a completed Job as a MediaPackageElement.
1953             // Wait for the job to complete if necessary
1954             waitForJob(getServiceRegistry(), Optional.empty(), job);
1955             return (Track) MediaPackageElementParser.getFromXml(job.getPayload());
1956           } catch (Exception e) {
1957             throw new RuntimeException("Error parsing job payload as track", e);
1958           }
1959         })
1960         .peek(MediaPackageSupport.updateElement(mediaPackage))
1961         .collect(Collectors.toList());
1962 
1963     // Create the SMIL document
1964     org.w3c.dom.Document smilDocument = SmilUtil.createSmil();
1965     for (Track track : tracks) {
1966       Long startTime = partialTrackStartTimes.getIfPresent(track.getIdentifier());
1967       if (startTime == null) {
1968         logger.error("No start time found for track {}", track);
1969         throw new IngestException("No start time found for track " + track.getIdentifier());
1970       }
1971       smilDocument = addSmilTrack(smilDocument, track, startTime);
1972       partialTrackStartTimes.invalidate(track.getIdentifier());
1973     }
1974 
1975     // Store the SMIL document in the mediapackage
1976     return addSmilCatalog(smilDocument, mediaPackage);
1977   }
1978 
1979   /**
1980    * Adds a SMIL catalog to a mediapackage if it's not already existing.
1981    *
1982    * @param smilDocument
1983    *          the smil document
1984    * @param mediaPackage
1985    *          the mediapackage to extend with the SMIL catalog
1986    * @return the augmented mediapcakge
1987    * @throws IOException
1988    *           if reading or writing of the SMIL catalog fails
1989    * @throws IngestException
1990    *           if the SMIL catalog already exists
1991    */
1992   private MediaPackage addSmilCatalog(org.w3c.dom.Document smilDocument, MediaPackage mediaPackage)
1993           throws IOException, IngestException {
1994     Optional<org.w3c.dom.Document> optSmilDocument = loadSmilDocument(workingFileRepository, mediaPackage);
1995     if (optSmilDocument.isPresent())
1996       throw new IngestException("SMIL already exists!");
1997 
1998     InputStream in = null;
1999     try {
2000       in = XmlUtil.serializeDocument(smilDocument);
2001       String elementId = UUID.randomUUID().toString();
2002       URI uri = workingFileRepository.put(mediaPackage.getIdentifier().toString(), elementId, PARTIAL_SMIL_NAME, in);
2003       MediaPackageElement mpe = mediaPackage.add(uri, MediaPackageElement.Type.Catalog, MediaPackageElements.SMIL);
2004       mpe.setIdentifier(elementId);
2005       // Reset the checksum since it changed
2006       mpe.setChecksum(null);
2007       mpe.setMimeType(MimeTypes.SMIL);
2008       return mediaPackage;
2009     } finally {
2010       IoSupport.closeQuietly(in);
2011     }
2012   }
2013 
2014   /**
2015    * Load a SMIL document of a media package.
2016    *
2017    * @return the document or none if no media package element found.
2018    */
2019   private Optional<org.w3c.dom.Document> loadSmilDocument(final WorkingFileRepository workingFileRepository,
2020           MediaPackage mp) {
2021     return Arrays.stream(mp.getElements())
2022         .filter(MediaPackageSupport.Filters::isSmilCatalog)
2023         .findFirst()
2024         .map(mpe -> {
2025           try (InputStream in = workingFileRepository.get(
2026               mpe.getMediaPackage().getIdentifier().toString(),
2027               mpe.getIdentifier())) {
2028             return SmilUtil.loadSmilDocument(in, mpe);
2029           } catch (Exception e) {
2030             logger.warn("Unable to load smil document from catalog '{}'", mpe, e);
2031             return Misc.chuck(e);
2032           }
2033         });
2034   }
2035 
2036   /**
2037    * Adds a SMIL track by a mediapackage track to a SMIL document
2038    *
2039    * @param smilDocument
2040    *          the SMIL document to extend
2041    * @param track
2042    *          the mediapackage track
2043    * @param startTime
2044    *          the start time
2045    * @return the augmented SMIL document
2046    * @throws IngestException
2047    *           if the partial flavor type is not valid
2048    */
2049   private org.w3c.dom.Document addSmilTrack(org.w3c.dom.Document smilDocument, Track track, long startTime)
2050           throws IngestException {
2051     if (MediaPackageElements.PRESENTER_SOURCE.getType().equals(track.getFlavor().getType())) {
2052       return SmilUtil.addTrack(smilDocument, SmilUtil.TrackType.PRESENTER, track.hasVideo(), startTime,
2053               track.getDuration(), track.getURI(), track.getIdentifier());
2054     } else if (MediaPackageElements.PRESENTATION_SOURCE.getType().equals(track.getFlavor().getType())) {
2055       return SmilUtil.addTrack(smilDocument, SmilUtil.TrackType.PRESENTATION, track.hasVideo(), startTime,
2056               track.getDuration(), track.getURI(), track.getIdentifier());
2057     } else {
2058       logger.warn("Invalid partial flavor type {} of track {}", track.getFlavor(), track);
2059       throw new IngestException(
2060               "Invalid partial flavor type " + track.getFlavor().getType() + " of track " + track.getURI().toString());
2061     }
2062   }
2063 
2064   private MediaPackage checkForCASeries(MediaPackage mp, String seriesAppendName) {
2065     //Check for media package id and CA series appendix set
2066     if (mp == null || seriesAppendName == null) {
2067       logger.debug("No series name provided");
2068       return mp;
2069     }
2070 
2071     // Verify user is a CA by checking roles and captureAgentId
2072     User user = securityService.getUser();
2073     if (!user.hasRole(GLOBAL_ADMIN_ROLE) && !user.hasRole(GLOBAL_CAPTURE_AGENT_ROLE)) {
2074       logger.info("User '{}' is missing capture agent roles, won't apply CASeries", user.getUsername());
2075       return mp;
2076     }
2077     //Get capture agent name
2078     String captureAgentId = null;
2079     Catalog[] catalog = mp.getCatalogs(MediaPackageElementFlavor.flavor("dublincore", "episode"));
2080     if (catalog.length == 1) {
2081       try (InputStream catalogInputStream = workingFileRepository.get(mp.getIdentifier().toString(), catalog[0].getIdentifier())) {
2082         DublinCoreCatalog dc = dublinCoreService.load(catalogInputStream);
2083         captureAgentId = getCaptureAgent(dc);
2084       } catch (Exception e) {
2085         logger.info("Unable to determine capture agent name");
2086       }
2087     }
2088     if (captureAgentId == null) {
2089       logger.info("No Capture Agent ID defined for MediaPackage {}, won't apply CASeries", mp.getIdentifier());
2090       return mp;
2091     }
2092     logger.info("Applying CASeries to MediaPackage {} for capture agent '{}'", mp.getIdentifier(), captureAgentId);
2093 
2094     // Find or create CA series
2095     String seriesId = captureAgentId.replaceAll("[^\\w-_.:;()]+", "_");
2096     String seriesName = captureAgentId + seriesAppendName;
2097 
2098     try {
2099       seriesService.getSeries(seriesId);
2100     } catch (NotFoundException nfe) {
2101       try {
2102         List<String> roleNames = new ArrayList<>();
2103         String roleName = SecurityUtil.getCaptureAgentRole(captureAgentId);
2104         roleNames.add(roleName);
2105         logger.debug("Capture agent role name: {}", roleName);
2106 
2107         String username = user.getUsername();
2108         roleNames.add(UserIdRoleProvider.getUserIdRole(username));
2109 
2110         logger.info("Creating new series for capture agent '{}' and user '{}'", captureAgentId, username);
2111         createSeries(seriesId, seriesName, roleNames);
2112       } catch (Exception e) {
2113         logger.error("Unable to create series {} for event {}", seriesName, mp, e);
2114         return mp;
2115       }
2116     } catch (SeriesException | UnauthorizedException e) {
2117       logger.error("Exception while searching for series {}", seriesName, e);
2118       return mp;
2119     }
2120 
2121     // Add the event to CA series
2122     mp.setSeries(seriesId);
2123     mp.setSeriesTitle(seriesName);
2124 
2125     return mp;
2126   }
2127 
2128   private DublinCoreCatalog createSeries(String seriesId, String seriesName, List<String> roleNames)
2129       throws SeriesException, UnauthorizedException, NotFoundException {
2130     DublinCoreCatalog dc = DublinCores.mkOpencastSeries().getCatalog();
2131     dc.set(PROPERTY_IDENTIFIER, seriesId);
2132     dc.set(PROPERTY_TITLE, seriesName);
2133     dc.set(DublinCore.PROPERTY_CREATED, EncodingSchemeUtils.encodeDate(new Date(), Precision.Second));
2134     // set series name
2135     DublinCoreCatalog createdSeries = seriesService.updateSeries(dc);
2136 
2137     // fill acl
2138     List<AccessControlEntry> aces = new ArrayList();
2139     for (String roleName : roleNames) {
2140       AccessControlEntry aceRead = new AccessControlEntry(roleName, Permissions.Action.READ.toString(), true);
2141       AccessControlEntry aceWrite = new AccessControlEntry(roleName, Permissions.Action.WRITE.toString(), true);
2142       aces.add(aceRead);
2143       aces.add(aceWrite);
2144     }
2145     AccessControlList acl = new AccessControlList(aces);
2146     seriesService.updateAccessControl(seriesId, acl);
2147     logger.info("Created capture agent series with name {} and id {}", seriesName, seriesId);
2148 
2149     return dc;
2150   }
2151 }