View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.scheduler.impl;
22  
23  import static com.entwinemedia.fn.Stream.$;
24  import static com.entwinemedia.fn.data.Opt.some;
25  import static org.apache.commons.lang3.StringUtils.isBlank;
26  import static org.opencastproject.scheduler.impl.SchedulerUtil.calculateChecksum;
27  import static org.opencastproject.scheduler.impl.SchedulerUtil.eventOrganizationFilter;
28  import static org.opencastproject.scheduler.impl.SchedulerUtil.isNotEpisodeDublinCore;
29  import static org.opencastproject.scheduler.impl.SchedulerUtil.uiAdapterToFlavor;
30  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
31  import static org.opencastproject.util.EqualsUtil.ne;
32  import static org.opencastproject.util.RequireUtil.notEmpty;
33  import static org.opencastproject.util.RequireUtil.notNull;
34  import static org.opencastproject.util.RequireUtil.requireTrue;
35  import static org.opencastproject.util.data.Monadics.mlist;
36  
37  import org.opencastproject.assetmanager.api.Asset;
38  import org.opencastproject.assetmanager.api.AssetManager;
39  import org.opencastproject.assetmanager.api.Availability;
40  import org.opencastproject.assetmanager.api.Snapshot;
41  import org.opencastproject.assetmanager.api.query.AQueryBuilder;
42  import org.opencastproject.assetmanager.api.query.ARecord;
43  import org.opencastproject.assetmanager.api.query.AResult;
44  import org.opencastproject.assetmanager.api.query.ASelectQuery;
45  import org.opencastproject.assetmanager.api.query.Predicate;
46  import org.opencastproject.elasticsearch.api.SearchIndexException;
47  import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
48  import org.opencastproject.elasticsearch.index.objects.event.Event;
49  import org.opencastproject.elasticsearch.index.objects.event.EventIndexUtils;
50  import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
51  import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
52  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
53  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
54  import org.opencastproject.mediapackage.Catalog;
55  import org.opencastproject.mediapackage.MediaPackage;
56  import org.opencastproject.mediapackage.MediaPackageElement;
57  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
58  import org.opencastproject.mediapackage.MediaPackageElements;
59  import org.opencastproject.mediapackage.MediaPackageException;
60  import org.opencastproject.mediapackage.MediaPackageSupport;
61  import org.opencastproject.mediapackage.identifier.Id;
62  import org.opencastproject.mediapackage.identifier.IdImpl;
63  import org.opencastproject.message.broker.api.scheduler.SchedulerItem;
64  import org.opencastproject.message.broker.api.scheduler.SchedulerItemList;
65  import org.opencastproject.message.broker.api.update.SchedulerUpdateHandler;
66  import org.opencastproject.metadata.dublincore.DCMIPeriod;
67  import org.opencastproject.metadata.dublincore.DublinCore;
68  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
69  import org.opencastproject.metadata.dublincore.DublinCoreUtil;
70  import org.opencastproject.metadata.dublincore.DublinCoreValue;
71  import org.opencastproject.metadata.dublincore.DublinCores;
72  import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
73  import org.opencastproject.metadata.dublincore.EventCatalogUIAdapter;
74  import org.opencastproject.metadata.dublincore.Precision;
75  import org.opencastproject.scheduler.api.Recording;
76  import org.opencastproject.scheduler.api.RecordingImpl;
77  import org.opencastproject.scheduler.api.RecordingState;
78  import org.opencastproject.scheduler.api.SchedulerConflictException;
79  import org.opencastproject.scheduler.api.SchedulerException;
80  import org.opencastproject.scheduler.api.SchedulerService;
81  import org.opencastproject.scheduler.api.TechnicalMetadata;
82  import org.opencastproject.scheduler.api.TechnicalMetadataImpl;
83  import org.opencastproject.scheduler.api.Util;
84  import org.opencastproject.scheduler.impl.persistence.ExtendedEventDto;
85  import org.opencastproject.security.api.AccessControlList;
86  import org.opencastproject.security.api.AccessControlParser;
87  import org.opencastproject.security.api.AccessControlUtil;
88  import org.opencastproject.security.api.AuthorizationService;
89  import org.opencastproject.security.api.Organization;
90  import org.opencastproject.security.api.OrganizationDirectoryService;
91  import org.opencastproject.security.api.SecurityService;
92  import org.opencastproject.security.api.UnauthorizedException;
93  import org.opencastproject.security.api.User;
94  import org.opencastproject.security.util.SecurityUtil;
95  import org.opencastproject.series.api.SeriesService;
96  import org.opencastproject.util.DateTimeSupport;
97  import org.opencastproject.util.NotFoundException;
98  import org.opencastproject.util.OsgiUtil;
99  import org.opencastproject.util.XmlNamespaceBinding;
100 import org.opencastproject.util.XmlNamespaceContext;
101 import org.opencastproject.util.data.Option;
102 import org.opencastproject.util.data.functions.Misc;
103 import org.opencastproject.util.data.functions.Strings;
104 import org.opencastproject.workspace.api.Workspace;
105 
106 import com.entwinemedia.fn.Fn;
107 import com.entwinemedia.fn.Stream;
108 import com.entwinemedia.fn.data.Opt;
109 import com.google.common.cache.Cache;
110 import com.google.common.cache.CacheBuilder;
111 import com.google.gson.Gson;
112 import com.google.gson.reflect.TypeToken;
113 
114 import net.fortuna.ical4j.model.Period;
115 import net.fortuna.ical4j.model.TimeZoneRegistry;
116 import net.fortuna.ical4j.model.TimeZoneRegistryFactory;
117 import net.fortuna.ical4j.model.property.RRule;
118 
119 import org.apache.commons.io.IOUtils;
120 import org.apache.commons.lang3.StringUtils;
121 import org.joda.time.DateTime;
122 import org.joda.time.DateTimeZone;
123 import org.osgi.service.cm.ConfigurationException;
124 import org.osgi.service.cm.ManagedService;
125 import org.osgi.service.component.ComponentContext;
126 import org.osgi.service.component.annotations.Activate;
127 import org.osgi.service.component.annotations.Component;
128 import org.osgi.service.component.annotations.Reference;
129 import org.osgi.service.component.annotations.ReferenceCardinality;
130 import org.osgi.service.component.annotations.ReferencePolicy;
131 import org.osgi.service.component.annotations.ReferencePolicyOption;
132 import org.slf4j.Logger;
133 import org.slf4j.LoggerFactory;
134 
135 import java.io.IOException;
136 import java.io.InputStream;
137 import java.lang.reflect.Type;
138 import java.net.URI;
139 import java.util.ArrayList;
140 import java.util.Arrays;
141 import java.util.Calendar;
142 import java.util.Collections;
143 import java.util.Comparator;
144 import java.util.Date;
145 import java.util.Dictionary;
146 import java.util.HashMap;
147 import java.util.HashSet;
148 import java.util.LinkedList;
149 import java.util.List;
150 import java.util.Map;
151 import java.util.Map.Entry;
152 import java.util.Optional;
153 import java.util.Set;
154 import java.util.TimeZone;
155 import java.util.UUID;
156 import java.util.concurrent.ConcurrentHashMap;
157 import java.util.concurrent.CopyOnWriteArrayList;
158 import java.util.concurrent.TimeUnit;
159 import java.util.function.Function;
160 import java.util.stream.Collectors;
161 
162 /**
163  * Implementation of {@link SchedulerService}.
164  */
165 @Component(
166     service = { ManagedService.class, SchedulerService.class, IndexProducer.class },
167     property = {
168         "service.description=Scheduler Service"
169     }
170 )
171 public class SchedulerServiceImpl extends AbstractIndexProducer implements SchedulerService, ManagedService {
172 
173   /** The logger */
174   private static final Logger logger = LoggerFactory.getLogger(SchedulerServiceImpl.class);
175 
176   /** The last modified cache configuration key */
177   private static final String CFG_KEY_LAST_MODIFIED_CACHE_EXPIRE = "last_modified_cache_expire";
178 
179   /** The maintenance configuration key */
180   private static final String CFG_KEY_MAINTENANCE = "maintenance";
181 
182   /** The default cache expire time in seconds */
183   private static final int DEFAULT_CACHE_EXPIRE = 60;
184 
185   /** The Etag for an empty calendar */
186   private static final String EMPTY_CALENDAR_ETAG = "mod0";
187 
188   private static final String SNAPSHOT_OWNER = SchedulerService.JOB_TYPE;
189 
190   private static final Gson gson = new Gson();
191   /**
192    * Deserializes properties stored in string columns of the extended event table
193    * @param props Properties as retrieved from the DB
194    * @return deserialized key-value pairs
195    */
196   private static Map<String,String> deserializeExtendedEventProperties(String props) {
197     if (props == null || props.trim().isEmpty()) {
198       return new HashMap<>();
199     }
200     Type type = new TypeToken<Map<String, String>>() { }.getType();
201     return gson.fromJson(props, type);
202   }
203 
204   /** The last modified cache */
205   protected Cache<String, String> lastModifiedCache = CacheBuilder.newBuilder()
206           .expireAfterWrite(DEFAULT_CACHE_EXPIRE, TimeUnit.SECONDS).build();
207 
208   /** Persistent storage for events */
209   private SchedulerServiceDatabase persistence;
210 
211   /** The series service */
212   private SeriesService seriesService;
213 
214   /** The security service used to run the security context with. */
215   private SecurityService securityService;
216 
217   /** The asset manager */
218   private AssetManager assetManager;
219 
220   /** The workspace */
221   private Workspace workspace;
222 
223   /** The authorization service */
224   private AuthorizationService authorizationService;
225 
226   /** The organization directory service */
227   private OrganizationDirectoryService orgDirectoryService;
228 
229   /** The Elasticsearch indices */
230   private ElasticsearchIndex index;
231 
232   /** The list of registered event catalog UI adapters */
233   private List<EventCatalogUIAdapter> eventCatalogUIAdapters = new ArrayList<>();
234   private final List<SchedulerUpdateHandler> schedulerUpdateHandlers = new CopyOnWriteArrayList<>();
235 
236   /** The system user name */
237   private String systemUserName;
238 
239   private ComponentContext componentContext;
240 
241   /**
242    * OSGi callback to add an update handler.
243    *
244    * @param handler
245    */
246   @Reference(
247       cardinality = ReferenceCardinality.MULTIPLE,
248       policy = ReferencePolicy.DYNAMIC,
249       policyOption = ReferencePolicyOption.GREEDY,
250       unbind = "removeSchedulerUpdateHandler"
251   )
252   public void addSchedulerUpdateHandler(SchedulerUpdateHandler handler) {
253     this.schedulerUpdateHandlers.add(handler);
254   }
255 
256   public void removeSchedulerUpdateHandler(SchedulerUpdateHandler handler) {
257     this.schedulerUpdateHandlers.remove(handler);
258   }
259 
260   /**
261    * OSGi callback to set Persistence Service.
262    *
263    * @param persistence
264    */
265   @Reference
266   public void setPersistence(SchedulerServiceDatabase persistence) {
267     this.persistence = persistence;
268   }
269 
270   /**
271    * OSGi callback for setting Series Service.
272    *
273    * @param seriesService
274    */
275   @Reference
276   public void setSeriesService(SeriesService seriesService) {
277     this.seriesService = seriesService;
278   }
279 
280   /**
281    * OSGi callback to set security service.
282    *
283    * @param securityService
284    */
285   @Reference
286   public void setSecurityService(SecurityService securityService) {
287     this.securityService = securityService;
288   }
289 
290   /**
291    * OSGi callback to set the asset manager.
292    *
293    * @param assetManager
294    */
295   @Reference
296   public void setAssetManager(AssetManager assetManager) {
297     this.assetManager = assetManager;
298   }
299 
300   /**
301    * OSGi callback to set the workspace.
302    *
303    * @param workspace
304    */
305   @Reference
306   public void setWorkspace(Workspace workspace) {
307     this.workspace = workspace;
308   }
309 
310   /**
311    * OSGi callback to set the authorization service.
312    *
313    * @param authorizationService
314    */
315   @Reference
316   public void setAuthorizationService(AuthorizationService authorizationService) {
317     this.authorizationService = authorizationService;
318   }
319 
320   /**
321    * Update all of the handlers that an event has changed
322    *
323    * @param list The list of scheduler changes for a mediapackage
324    */
325   private void sendSchedulerUpdate(SchedulerItemList list) {
326     while (schedulerUpdateHandlers.size() != 1) {
327       logger.warn("Expecting 1 handler, but {} are registered.  Waiting 10s then retrying...", schedulerUpdateHandlers.size());
328       try {
329         Thread.sleep(10000L);
330       } catch (InterruptedException e) { /* swallow this, nothing to do */ }
331     }
332     String mpid = list.getId();
333     for (SchedulerItem item : list.getItems()) {
334       for (SchedulerUpdateHandler handler : this.schedulerUpdateHandlers) {
335         handler.execute(mpid, item);
336       }
337     }
338   }
339 
340   /**
341    * OSGi callback to set the organization directory service.
342    *
343    * @param orgDirectoryService
344    */
345   @Reference
346   public void setOrgDirectoryService(OrganizationDirectoryService orgDirectoryService) {
347     this.orgDirectoryService = orgDirectoryService;
348   }
349 
350   /**
351    * OSgi callback to set the Elasticsearch index.
352    *
353    * @param index
354    *          the Elasticsearch index.
355    */
356   @Reference
357   public void setIndex(ElasticsearchIndex index) {
358     this.index = index;
359   }
360 
361   /** OSGi callback to add {@link EventCatalogUIAdapter} instance. */
362   @Reference(
363       cardinality = ReferenceCardinality.MULTIPLE,
364       policy = ReferencePolicy.DYNAMIC,
365       policyOption = ReferencePolicyOption.GREEDY,
366       unbind = "removeCatalogUIAdapter"
367   )
368   public void addCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
369     eventCatalogUIAdapters.add(catalogUIAdapter);
370   }
371 
372   /** OSGi callback to remove {@link EventCatalogUIAdapter} instance. */
373   public void removeCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
374     eventCatalogUIAdapters.remove(catalogUIAdapter);
375   }
376 
377   /**
378    * Activates Scheduler Service.
379    *
380    * @param cc
381    *          ComponentContext
382    * @throws Exception
383    */
384   @Activate
385   public void activate(ComponentContext cc) throws Exception {
386     this.componentContext = cc;
387     systemUserName = SecurityUtil.getSystemUserName(cc);
388     logger.info("Activating Scheduler Service");
389   }
390 
391   @Override
392   public void updated(Dictionary<String, ?> properties) throws ConfigurationException {
393     if (properties != null) {
394       final Option<Integer> cacheExpireDuration = OsgiUtil.getOptCfg(properties, CFG_KEY_LAST_MODIFIED_CACHE_EXPIRE)
395               .bind(Strings.toInt);
396       if (cacheExpireDuration.isSome()) {
397         lastModifiedCache = CacheBuilder.newBuilder().expireAfterWrite(cacheExpireDuration.get(), TimeUnit.SECONDS)
398                 .build();
399         logger.info("Set last modified cache to {}", DateTimeSupport.humanReadableTime(cacheExpireDuration.get()));
400       } else {
401         logger.info("Set last modified cache to default {}", DateTimeSupport.humanReadableTime(DEFAULT_CACHE_EXPIRE));
402       }
403       final Option<Boolean> maintenance = OsgiUtil.getOptCfgAsBoolean(properties, CFG_KEY_MAINTENANCE);
404       if (maintenance.getOrElse(false)) {
405         final String name = SchedulerServiceImpl.class.getName();
406         logger.warn("Putting scheduler into maintenance mode. This only makes sense when migrating data. If this is not"
407                 + " intended, edit the config file '{}.cfg' accordingly and restart opencast.", name);
408         componentContext.disableComponent(name);
409       }
410     }
411   }
412 
413   @Override
414   public void addEvent(Date startDateTime, Date endDateTime, String captureAgentId, Set<String> userIds,
415           MediaPackage mediaPackage, Map<String, String> wfProperties, Map<String, String> caMetadata,
416           Opt<String> schedulingSource)
417                   throws UnauthorizedException, SchedulerException {
418     addEventInternal(startDateTime, endDateTime, captureAgentId, userIds, mediaPackage, wfProperties, caMetadata,
419             schedulingSource);
420   }
421 
422   private void addEventInternal(Date startDateTime, Date endDateTime, String captureAgentId, Set<String> userIds,
423           MediaPackage mediaPackage, Map<String, String> wfProperties, Map<String, String> caMetadata,
424           Opt<String> schedulingSource)
425                   throws SchedulerException {
426     notNull(startDateTime, "startDateTime");
427     notNull(endDateTime, "endDateTime");
428     notEmpty(captureAgentId, "captureAgentId");
429     notNull(userIds, "userIds");
430     notNull(mediaPackage, "mediaPackage");
431     notNull(wfProperties, "wfProperties");
432     notNull(caMetadata, "caMetadata");
433     notNull(schedulingSource, "schedulingSource");
434     if (endDateTime.before(startDateTime))
435       throw new IllegalArgumentException("The end date is before the start date");
436 
437     final String mediaPackageId = mediaPackage.getIdentifier().toString();
438 
439     try {
440       AQueryBuilder query = assetManager.createQuery();
441       AResult result = query.select(query.nothing())
442               .where(withOrganization(query).and(query.mediaPackageId(mediaPackageId).and(query.version().isLatest())))
443               .run();
444       Optional<ARecord> record = result.getRecords().stream().findFirst();
445       if (record.isPresent()) {
446         logger.warn("Mediapackage with id '{}' already exists!", mediaPackageId);
447         throw new SchedulerConflictException("Mediapackage with id '" + mediaPackageId + "' already exists!");
448       }
449 
450       Opt<String> seriesId = Opt.nul(StringUtils.trimToNull(mediaPackage.getSeries()));
451 
452       List<MediaPackage> conflictingEvents = findConflictingEvents(captureAgentId, startDateTime, endDateTime);
453       if (conflictingEvents.size() > 0) {
454         logger.info("Unable to add event {}, conflicting events found: {}", mediaPackageId, conflictingEvents);
455         throw new SchedulerConflictException(
456                 "Unable to add event, conflicting events found for event " + mediaPackageId);
457       }
458 
459       // Load dublincore and acl for update
460       Opt<DublinCoreCatalog> dublinCore = DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage)
461           .map(Opt::some).orElse(Opt.none());
462       AccessControlList acl = authorizationService.getActiveAcl(mediaPackage).getA();
463 
464       // Get updated agent properties
465       Map<String, String> finalCaProperties = getFinalAgentProperties(caMetadata, wfProperties, captureAgentId,
466               seriesId, dublinCore);
467 
468       // Persist asset
469       String checksum = calculateChecksum(workspace, getEventCatalogUIAdapterFlavors(), startDateTime, endDateTime,
470                                           captureAgentId, userIds, mediaPackage, dublinCore, wfProperties,
471                                           finalCaProperties, acl);
472       persistEvent(mediaPackageId, checksum, Opt.some(startDateTime), Opt.some(endDateTime),
473               Opt.some(captureAgentId), Opt.some(userIds), Opt.some(mediaPackage), Opt.some(wfProperties),
474               Opt.some(finalCaProperties), schedulingSource);
475 
476       // Update live event
477       updateLiveEvent(mediaPackageId, Opt.some(acl), dublinCore, Opt.some(startDateTime),
478               Opt.some(endDateTime), Opt.some(captureAgentId), Opt.some(finalCaProperties));
479 
480       // Update Elasticsearch index
481       updateEventInIndex(mediaPackageId, Opt.some(acl), dublinCore, Opt.some(startDateTime), Opt.some(endDateTime),
482           Opt.some(userIds), Opt.some(captureAgentId), Opt.some(finalCaProperties), Opt.none());
483 
484       // Update last modified
485       touchLastEntry(captureAgentId);
486     } catch (SchedulerException e) {
487       throw e;
488     } catch (Exception e) {
489       logger.error("Failed to create event with id '{}':", mediaPackageId, e);
490       throw new SchedulerException(e);
491     }
492   }
493 
494   @Override
495   public Map<String, Period> addMultipleEvents(RRule rRule, Date start, Date end, Long duration, TimeZone tz,
496           String captureAgentId, Set<String> userIds, MediaPackage templateMp, Map<String, String> wfProperties,
497           Map<String, String> caMetadata, Opt<String> schedulingSource)
498           throws UnauthorizedException, SchedulerConflictException, SchedulerException {
499     // input Rrule is UTC. Needs to be adjusted to tz
500     Util.adjustRrule(rRule, start, tz);
501     List<Period> periods = Util.calculatePeriods(start, end, duration, rRule, tz);
502     if (periods.isEmpty()) {
503       return Collections.emptyMap();
504     }
505     return addMultipleEventInternal(periods, captureAgentId, userIds, templateMp, wfProperties, caMetadata,
506             schedulingSource);
507   }
508 
509   private Map<String, Period> addMultipleEventInternal(List<Period> periods, String captureAgentId,
510           Set<String> userIds, MediaPackage templateMp, Map<String, String> wfProperties,
511           Map<String, String> caMetadata, Opt<String> schedulingSource) throws SchedulerException {
512     notNull(periods, "periods");
513     requireTrue(periods.size() > 0, "periods");
514     notEmpty(captureAgentId, "captureAgentId");
515     notNull(userIds, "userIds");
516     notNull(templateMp, "mediaPackages");
517     notNull(wfProperties, "wfProperties");
518     notNull(caMetadata, "caMetadata");
519     notNull(schedulingSource, "schedulingSource");
520 
521     Map<String, Period> scheduledEvents = new ConcurrentHashMap<>();
522 
523     try {
524       LinkedList<Id> ids = new LinkedList<>();
525       AQueryBuilder qb = assetManager.createQuery();
526       Predicate p = null;
527       //While we don't have a list of IDs equal to the number of periods
528       while (ids.size() <= periods.size()) {
529         //Create a list of IDs equal to the number of periods, along with a set of AM predicates
530         while (ids.size() <= periods.size()) {
531           Id id = new IdImpl(UUID.randomUUID().toString());
532           ids.add(id);
533           Predicate np = qb.mediaPackageId(id.toString());
534           //Haha, p = np jokes with the AM query language. Ha. Haha. Ha.  (Sob...)
535           if (null == p) {
536             p = np;
537           } else {
538             p = p.or(np);
539           }
540         }
541         //Select the list of ids which alread exist.  Hint: this needs to be zero
542         AResult result = qb.select(qb.nothing()).where(withOrganization(qb).and(p).and(qb.version().isLatest())).run();
543         //If there is conflict, clear the list and start over
544         if (result.getTotalSize() > 0) {
545           ids.clear();
546         }
547       }
548 
549       Opt<String> seriesId = Opt.nul(StringUtils.trimToNull(templateMp.getSeries()));
550 
551       List<MediaPackage> conflictingEvents = findConflictingEvents(periods, captureAgentId, TimeZone.getDefault());
552       if (conflictingEvents.size() > 0) {
553         logger.info("Unable to add events, conflicting events found: {}", conflictingEvents);
554         throw new SchedulerConflictException("Unable to add event, conflicting events found");
555       }
556 
557       final Organization org = securityService.getOrganization();
558       final User user = securityService.getUser();
559       periods.parallelStream().forEach(event -> SecurityUtil.runAs(securityService, org, user, () -> {
560         final int currentCounter = periods.indexOf(event);
561         MediaPackage mediaPackage = (MediaPackage) templateMp.clone();
562         Date startDate = new Date(event.getStart().getTime());
563         Date endDate = new Date(event.getEnd().getTime());
564         Id id = ids.get(currentCounter);
565 
566         //Get, or make, the DC catalog
567         DublinCoreCatalog dc;
568         Optional<DublinCoreCatalog> dcOpt = DublinCoreUtil.loadEpisodeDublinCore(workspace, templateMp);
569         if (dcOpt.isPresent()) {
570           dc = dcOpt.get();
571           dc = (DublinCoreCatalog) dc.clone();
572           // make sure to bind the OC_PROPERTY namespace
573           dc.addBindings(XmlNamespaceContext
574                   .mk(XmlNamespaceBinding.mk(DublinCores.OC_PROPERTY_NS_PREFIX, DublinCores.OC_PROPERTY_NS_URI)));
575         } else {
576           dc = DublinCores.mkOpencastEpisode().getCatalog();
577         }
578 
579         // Set the new media package identifier
580         mediaPackage.setIdentifier(id);
581 
582         // Update dublincore title and temporal
583         String newTitle = dc.getFirst(DublinCore.PROPERTY_TITLE) + String.format(" %0" + Integer.toString(periods.size()).length() + "d", currentCounter + 1);
584         dc.set(DublinCore.PROPERTY_TITLE, newTitle);
585         DublinCoreValue eventTime = EncodingSchemeUtils.encodePeriod(new DCMIPeriod(startDate, endDate),
586                 Precision.Second);
587         dc.set(DublinCore.PROPERTY_TEMPORAL, eventTime);
588         dc.set(DublinCore.PROPERTY_CREATED, EncodingSchemeUtils.encodeDate(startDate, Precision.Second));
589         try {
590           mediaPackage = updateDublincCoreCatalog(mediaPackage, dc);
591         } catch (Exception e) {
592           Misc.chuck(e);
593         }
594         mediaPackage.setTitle(newTitle);
595 
596         String mediaPackageId = mediaPackage.getIdentifier().toString();
597         //Converting from iCal4j DateTime objects to plain Date objects to prevent AMQ issues below
598         Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
599         cal.setTime(event.getStart());
600         Date startDateTime = cal.getTime();
601         cal.setTime(event.getEnd());
602         Date endDateTime = cal.getTime();
603         // Load dublincore and acl for update
604         Opt<DublinCoreCatalog> dublinCore = DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage)
605             .map(Opt::some).orElse(Opt.none());
606         AccessControlList acl = authorizationService.getActiveAcl(mediaPackage).getA();
607 
608         // Get updated agent properties
609         Map<String, String> finalCaProperties = getFinalAgentProperties(caMetadata, wfProperties, captureAgentId,
610                 seriesId, dublinCore);
611 
612         // Persist asset
613         String checksum = calculateChecksum(workspace, getEventCatalogUIAdapterFlavors(), startDateTime, endDateTime,
614                 captureAgentId, userIds, mediaPackage, dublinCore, wfProperties, finalCaProperties, acl);
615         try {
616           persistEvent(mediaPackageId, checksum, Opt.some(startDateTime), Opt.some(endDateTime),
617                 Opt.some(captureAgentId), Opt.some(userIds), Opt.some(mediaPackage), Opt.some(wfProperties),
618                 Opt.some(finalCaProperties), schedulingSource);
619         } catch (Exception e) {
620           Misc.chuck(e);
621         }
622 
623         // Update live event
624         updateLiveEvent(mediaPackageId, some(acl), dublinCore, Opt.some(startDateTime), Opt.some(endDateTime),
625                 Opt.some(captureAgentId), Opt.some(finalCaProperties));
626 
627         // Update Elasticsearch index
628         updateEventInIndex(mediaPackageId, some(acl), dublinCore, Opt.some(startDateTime), Opt.some(endDateTime),
629                 Opt.some(userIds), Opt.some(captureAgentId), Opt.some(finalCaProperties), Opt.none());
630 
631         scheduledEvents.put(mediaPackageId, event);
632         for (MediaPackageElement mediaPackageElement : mediaPackage.getElements()) {
633           try {
634             workspace.delete(mediaPackage.getIdentifier().toString(), mediaPackageElement.getIdentifier());
635           } catch (NotFoundException | IOException e) {
636             logger.warn("Failed to delete media package element", e);
637           }
638         }
639       }));
640       return scheduledEvents;
641     } catch (SchedulerException e) {
642       throw e;
643     } catch (Exception e) {
644       throw new SchedulerException(e);
645     } finally {
646       // Update last modified
647       if (!scheduledEvents.isEmpty()) {
648         touchLastEntry(captureAgentId);
649       }
650     }
651   }
652 
653   @Override
654   public void updateEvent(final String mpId, Opt<Date> startDateTime, Opt<Date> endDateTime, Opt<String> captureAgentId,
655           Opt<Set<String>> userIds, Opt<MediaPackage> mediaPackage, Opt<Map<String, String>> wfProperties,
656           Opt<Map<String, String>> caMetadata)
657                   throws NotFoundException, UnauthorizedException, SchedulerException {
658     updateEventInternal(mpId, startDateTime, endDateTime, captureAgentId, userIds, mediaPackage,
659             wfProperties, caMetadata, false);
660   }
661 
662   @Override
663   public void updateEvent(final String mpId, Opt<Date> startDateTime, Opt<Date> endDateTime, Opt<String> captureAgentId,
664           Opt<Set<String>> userIds, Opt<MediaPackage> mediaPackage, Opt<Map<String, String>> wfProperties,
665           Opt<Map<String, String>> caMetadata, boolean allowConflict)
666                 throws NotFoundException, UnauthorizedException, SchedulerException {
667     updateEventInternal(mpId, startDateTime, endDateTime, captureAgentId, userIds, mediaPackage,
668             wfProperties, caMetadata, allowConflict);
669   }
670 
671   private void updateEventInternal(final String mpId, Opt<Date> startDateTime,
672           Opt<Date> endDateTime, Opt<String> captureAgentId, Opt<Set<String>> userIds,
673           Opt<MediaPackage> mediaPackageOpt, Opt<Map<String, String>> wfProperties, Opt<Map<String, String>> caMetadata,
674           boolean allowConflict) throws NotFoundException, SchedulerException {
675     notEmpty(mpId, "mpId");
676     notNull(startDateTime, "startDateTime");
677     notNull(endDateTime, "endDateTime");
678     notNull(captureAgentId, "captureAgentId");
679     notNull(userIds, "userIds");
680     notNull(mediaPackageOpt, "mediaPackageOpt");
681     notNull(wfProperties, "wfProperties");
682     notNull(caMetadata, "caMetadata");
683 
684     try {
685       AQueryBuilder query = assetManager.createQuery();
686 
687       ASelectQuery select = query
688               .select(query.snapshot())
689               .where(withOrganization(query).and(query.mediaPackageId(mpId).and(query.version().isLatest())
690                   .and(withOwner(query))));
691       Optional<ARecord> optEvent = select.run().getRecords().stream().findFirst();
692       Opt<ExtendedEventDto> optExtEvent = persistence.getEvent(mpId);
693       if (optEvent.isEmpty() || optExtEvent.isNone())
694         throw new NotFoundException("No event found while updating event " + mpId);
695 
696       ARecord record = optEvent.get();
697       if (record.getSnapshot().isEmpty())
698         throw new NotFoundException("No mediapackage found while updating event " + mpId);
699       Snapshot snapshot = record.getSnapshot().get();
700       MediaPackage archivedMediaPackage = snapshot.getMediaPackage();
701 
702       Opt<DublinCoreCatalog> archivedDublinCoreOpt = loadEpisodeDublinCoreFromAsset(snapshot);
703       if (archivedDublinCoreOpt.isNone())
704         throw new NotFoundException("No dublincore found while updating event " + mpId);
705       DublinCoreCatalog archivedDublinCore = archivedDublinCoreOpt.get();
706       AccessControlList archivedAcl = authorizationService.getActiveAcl(archivedMediaPackage).getA();
707 
708       final ExtendedEventDto extendedEventDto = optExtEvent.get();
709       Date start = extendedEventDto.getStartDate();
710       Date end = extendedEventDto.getEndDate();
711 
712       if ((startDateTime.isSome() || endDateTime.isSome()) && endDateTime.getOr(end).before(startDateTime.getOr(start)))
713         throw new SchedulerException("The end date is before the start date");
714 
715       String agentId = extendedEventDto.getCaptureAgentId();
716       Opt<String> seriesId = Opt.nul(archivedMediaPackage.getSeries());
717 
718       // Check for conflicting events
719       // Check scheduling conflicts in case a property relevant for conflicts has changed
720       if ((captureAgentId.isSome() || startDateTime.isSome() || endDateTime.isSome())
721             && (!allowConflict || !isAdmin())) {
722         List<MediaPackage> conflictingEvents = $(findConflictingEvents(captureAgentId.getOr(agentId),
723                 startDateTime.getOr(start), endDateTime.getOr(end))).filter(new Fn<MediaPackage, Boolean>() {
724                     @Override
725                     public Boolean apply(MediaPackage mp) {
726                     return !mpId.equals(mp.getIdentifier().toString());
727                   }
728                   }).toList();
729         if (conflictingEvents.size() > 0) {
730           logger.info("Unable to update event {}, conflicting events found: {}", mpId, conflictingEvents);
731           throw new SchedulerConflictException("Unable to update event, conflicting events found for event " + mpId);
732         }
733       }
734 
735       Set<String> presenters = getPresenters(Opt.nul(extendedEventDto.getPresenters()).getOr(""));
736       Map<String, String> wfProps = deserializeExtendedEventProperties(extendedEventDto.getWorkflowProperties());
737       Map<String, String> caProperties = deserializeExtendedEventProperties(
738               extendedEventDto.getCaptureAgentProperties());
739 
740       boolean propertiesChanged = false;
741       boolean dublinCoreChanged = false;
742 
743       // Get workflow properties
744       for (Map<String, String> wfPropsToUpdate : wfProperties) {
745         propertiesChanged = true;
746         wfProps = wfPropsToUpdate;
747       }
748 
749       // Get capture agent properties
750       for (Map<String, String> caMetadataToUpdate : caMetadata) {
751         propertiesChanged = true;
752         caProperties = caMetadataToUpdate;
753       }
754 
755       if (captureAgentId.isSome())
756         propertiesChanged = true;
757 
758       Opt<AccessControlList> changedAclOpt = Opt.none();
759       Opt<DublinCoreCatalog> changedDublinCoreOpt = Opt.none();
760       if (mediaPackageOpt.isSome()) {
761         MediaPackage mediaPackage = mediaPackageOpt.get();
762         // Check for series change
763         if (ne(archivedMediaPackage.getSeries(), mediaPackage.getSeries())) {
764           propertiesChanged = true;
765           seriesId = Opt.nul(mediaPackage.getSeries());
766         }
767 
768         // Check for ACL change
769         AccessControlList acl = authorizationService.getActiveAcl(mediaPackage).getA();
770         if (!AccessControlUtil.equals(acl, archivedAcl)) {
771           changedAclOpt = some(acl);
772         }
773 
774         // Check for dublin core change
775         Optional<DublinCoreCatalog> dublinCoreOpt = DublinCoreUtil.loadEpisodeDublinCore(workspace,
776                 mediaPackage);
777         if (dublinCoreOpt.isPresent() && !DublinCoreUtil.equals(archivedDublinCore, dublinCoreOpt.get())) {
778           dublinCoreChanged = true;
779           propertiesChanged = true;
780           changedDublinCoreOpt = dublinCoreOpt.map(Opt::some).orElse(Opt.none());
781         }
782       }
783 
784       //update metadata for dublincore
785       DublinCoreCatalog dublinCore = changedDublinCoreOpt.getOr(archivedDublinCore);
786       DublinCoreCatalog dublinCoreCopy = (DublinCoreCatalog) dublinCore.clone();
787       if (startDateTime.isSome() && endDateTime.isSome()) {
788         DublinCoreValue eventTime = EncodingSchemeUtils.encodePeriod(
789                 new DCMIPeriod(startDateTime.get(), endDateTime.get()), Precision.Second);
790         dublinCore.set(DublinCore.PROPERTY_TEMPORAL, eventTime);
791       }
792       if (captureAgentId.isSome()) {
793         dublinCore.set(DublinCore.PROPERTY_SPATIAL, captureAgentId.get());
794       }
795       if (!DublinCoreUtil.equals(dublinCore, dublinCoreCopy)) {
796         dublinCoreChanged = true;
797         changedDublinCoreOpt = Opt.some(dublinCore);
798         mediaPackageOpt = Opt.some(updateDublincCoreCatalog(mediaPackageOpt.getOr(archivedMediaPackage),
799                 changedDublinCoreOpt.get()));
800       }
801 
802       Opt<Map<String, String>> finalCaProperties = Opt.none();
803       if (propertiesChanged) {
804         finalCaProperties = Opt.some(getFinalAgentProperties(caProperties, wfProps, captureAgentId.getOr(agentId),
805                                                              seriesId, some(changedDublinCoreOpt.getOr(
806                                                                      archivedDublinCore))));
807       }
808 
809       String checksum = calculateChecksum(workspace, getEventCatalogUIAdapterFlavors(), startDateTime.getOr(start),
810               endDateTime.getOr(end), captureAgentId.getOr(agentId), userIds.getOr(presenters),
811               mediaPackageOpt.getOr(archivedMediaPackage),
812               some(changedDublinCoreOpt.getOr(archivedDublinCore)), wfProperties.getOr(wfProps),
813               finalCaProperties.getOr(caProperties), changedAclOpt.getOr(new AccessControlList()));
814 
815       String oldChecksum = extendedEventDto.getChecksum();
816       if (checksum.equals(oldChecksum)) {
817         logger.debug("Updated event {} has same checksum, ignore update", mpId);
818         return;
819       }
820 
821       // Update asset
822       persistEvent(mpId, checksum, startDateTime, endDateTime, captureAgentId, userIds,
823               mediaPackageOpt, wfProperties, finalCaProperties, Opt.none());
824 
825       // Update live event
826       updateLiveEvent(mpId, changedAclOpt, changedDublinCoreOpt, startDateTime, endDateTime, Opt.some(agentId),
827               finalCaProperties);
828 
829       // Update Elasticsearch index
830       updateEventInIndex(mpId, changedAclOpt, changedDublinCoreOpt, startDateTime, endDateTime, userIds,
831               Opt.some(agentId), finalCaProperties, Opt.none());
832 
833       // Update last modified
834       if (propertiesChanged || dublinCoreChanged || startDateTime.isSome() || endDateTime.isSome()) {
835         touchLastEntry(agentId);
836         for (String agent : captureAgentId) {
837           touchLastEntry(agent);
838         }
839       }
840     } catch (NotFoundException | SchedulerException e) {
841       throw e;
842     } catch (Exception e) {
843       throw new SchedulerException(e);
844     }
845   }
846 
847   private boolean isAdmin() {
848     return (securityService.getUser().hasRole(GLOBAL_ADMIN_ROLE)
849             || securityService.getUser().hasRole(securityService.getOrganization().getAdminRole()));
850   }
851 
852   private Opt<DublinCoreCatalog> loadEpisodeDublinCoreFromAsset(Snapshot snapshot) {
853     Option<MediaPackageElement> dcCatalog = mlist(snapshot.getMediaPackage().getElements())
854             .filter(MediaPackageSupport.Filters.isEpisodeDublinCore).headOpt();
855     if (dcCatalog.isNone())
856       return Opt.none();
857 
858     Optional<Asset> asset = assetManager.getAsset(snapshot.getVersion(),
859             snapshot.getMediaPackage().getIdentifier().toString(), dcCatalog.get().getIdentifier());
860     if (asset.isEmpty())
861       return Opt.none();
862 
863     if (Availability.OFFLINE.equals(asset.get().getAvailability()))
864       return Opt.none();
865 
866     InputStream inputStream = null;
867     try {
868       inputStream = asset.get().getInputStream();
869       return Opt.some(DublinCores.read(inputStream));
870     } finally {
871       IOUtils.closeQuietly(inputStream);
872     }
873   }
874 
875   @Override
876   public synchronized void removeEvent(String mediaPackageId)
877           throws NotFoundException, SchedulerException {
878     notEmpty(mediaPackageId, "mediaPackageId");
879 
880     boolean notFoundInDatabase = false;
881     boolean notFoundInAssetManager;
882     try {
883       // Remove from database
884       try {
885         Opt<ExtendedEventDto> extEvtOpt = persistence.getEvent(mediaPackageId);
886         if (extEvtOpt.isSome()) {
887           String agentId = extEvtOpt.get().getCaptureAgentId();
888           persistence.deleteEvent(mediaPackageId);
889           if (StringUtils.isNotEmpty(agentId)) {
890             touchLastEntry(agentId);
891           }
892         } else {
893           notFoundInDatabase = true;
894         }
895       } catch (NotFoundException e) {
896         notFoundInDatabase = true;
897       }
898 
899       // Delete scheduler snapshot
900       AQueryBuilder query = assetManager.createQuery();
901       long deletedSnapshots = query.delete(SNAPSHOT_OWNER, query.snapshot())
902               .where(withOrganization(query).and(query.mediaPackageId(mediaPackageId)))
903               .name("delete episode").run();
904       notFoundInAssetManager = deletedSnapshots == 0;
905 
906       // Update live event
907       sendSchedulerUpdate(new SchedulerItemList(mediaPackageId, SchedulerItem.delete()));
908 
909       // Update Elasticsearch index
910       removeSchedulingInfoFromIndex(mediaPackageId);
911     } catch (Exception e) {
912       logger.error("Could not remove event '{}' from persistent storage", mediaPackageId, e);
913       throw new SchedulerException(e);
914     }
915 
916     if (notFoundInDatabase && notFoundInAssetManager) {
917       throw new NotFoundException();
918     }
919   }
920 
921   @Override
922   public MediaPackage getMediaPackage(String mediaPackageId) throws NotFoundException, SchedulerException {
923     notEmpty(mediaPackageId, "mediaPackageId");
924 
925     try {
926       return getEventMediaPackage(mediaPackageId);
927     } catch (RuntimeNotFoundException e) {
928       throw e.getWrappedException();
929     } catch (Exception e) {
930       logger.error("Failed to get mediapackage of event '{}':", mediaPackageId, e);
931       throw new SchedulerException(e);
932     }
933   }
934 
935   @Override
936   public DublinCoreCatalog getDublinCore(String mediaPackageId) throws NotFoundException, SchedulerException {
937     notEmpty(mediaPackageId, "mediaPackageId");
938 
939     try {
940       AQueryBuilder query = assetManager.createQuery();
941       AResult result = query.select(query.snapshot())
942               .where(withOrganization(query).and(query.mediaPackageId(mediaPackageId)).and(withOwner(query))
943               .and(query.version().isLatest()))
944               .run();
945       Optional<ARecord> record = result.getRecords().stream().findFirst();
946       if (record.isEmpty())
947         throw new NotFoundException();
948 
949       Opt<DublinCoreCatalog> dublinCore = loadEpisodeDublinCoreFromAsset(record.get().getSnapshot().get());
950       if (dublinCore.isNone())
951         throw new NotFoundException("No dublincore catalog found " + mediaPackageId);
952 
953       return dublinCore.get();
954     } catch (NotFoundException e) {
955       throw e;
956     } catch (Exception e) {
957       logger.error("Failed to get dublin core catalog of event '{}':", mediaPackageId, e);
958       throw new SchedulerException(e);
959     }
960   }
961 
962   @Override
963   public TechnicalMetadata getTechnicalMetadata(String mediaPackageId)
964           throws NotFoundException, UnauthorizedException, SchedulerException {
965     notEmpty(mediaPackageId, "mediaPackageId");
966 
967     try {
968       final Opt<ExtendedEventDto> extEvt = persistence.getEvent(mediaPackageId);
969       if (extEvt.isNone())
970         throw new NotFoundException();
971 
972       return getTechnicalMetadata(extEvt.get());
973     } catch (NotFoundException e) {
974       throw e;
975     } catch (Exception e) {
976       logger.error("Failed to get technical metadata of event '{}':", mediaPackageId, e);
977       throw new SchedulerException(e);
978     }
979   }
980 
981   @Override
982   public Map<String, String> getWorkflowConfig(String mediaPackageId) throws NotFoundException, SchedulerException {
983     notEmpty(mediaPackageId, "mediaPackageId");
984 
985     try {
986       Opt<ExtendedEventDto> record = persistence.getEvent(mediaPackageId);
987       if (record.isNone())
988         throw new NotFoundException();
989       return deserializeExtendedEventProperties(record.get().getWorkflowProperties());
990     } catch (NotFoundException e) {
991       throw e;
992     } catch (Exception e) {
993       logger.error("Failed to get workflow configuration of event '{}':", mediaPackageId, e);
994       throw new SchedulerException(e);
995     }
996   }
997 
998   @Override
999   public Map<String, String> getCaptureAgentConfiguration(String mediaPackageId)
1000           throws NotFoundException, SchedulerException {
1001     notEmpty(mediaPackageId, "mediaPackageId");
1002 
1003     try {
1004       Opt<ExtendedEventDto> record = persistence.getEvent(mediaPackageId);
1005       if (record.isNone())
1006         throw new NotFoundException();
1007       return deserializeExtendedEventProperties(record.get().getCaptureAgentProperties());
1008     } catch (NotFoundException e) {
1009       throw e;
1010     } catch (Exception e) {
1011       logger.error("Failed to get capture agent contiguration of event '{}':", mediaPackageId, e);
1012       throw new SchedulerException(e);
1013     }
1014   }
1015 
1016   @Override
1017   public int getEventCount() throws SchedulerException {
1018     try {
1019       return persistence.countEvents();
1020     } catch (Exception e) {
1021       throw new SchedulerException(e);
1022     }
1023   }
1024 
1025   @Override
1026   public List<MediaPackage> search(Opt<String> captureAgentId, Opt<Date> startsFrom, Opt<Date> startsTo,
1027           Opt<Date> endFrom, Opt<Date> endTo) throws SchedulerException {
1028     try {
1029       return persistence.search(captureAgentId, startsFrom, startsTo, endFrom, endTo, Opt.none()).stream()
1030           .map(ExtendedEventDto::getMediaPackageId)
1031           .map(this::getEventMediaPackage).collect(Collectors.toList());
1032     } catch (Exception e) {
1033       throw new SchedulerException(e);
1034     }
1035   }
1036 
1037   @Override
1038   public Opt<MediaPackage> getCurrentRecording(String captureAgentId) throws SchedulerException {
1039     try {
1040       final Date now = new Date();
1041       List<ExtendedEventDto> result = persistence.search(Opt.some(captureAgentId), Opt.none(), Opt.some(now), Opt.some(now), Opt.none(), Opt.some(1));
1042       if (result.isEmpty()) {
1043         return Opt.none();
1044       }
1045       return Opt.some(getEventMediaPackage(result.get(0).getMediaPackageId()));
1046     } catch (Exception e) {
1047       throw new SchedulerException(e);
1048     }
1049   }
1050 
1051   @Override
1052   public Opt<MediaPackage> getUpcomingRecording(String captureAgentId) throws SchedulerException {
1053     try {
1054       final Date now = new Date();
1055       List<ExtendedEventDto> result = persistence.search(Opt.some(captureAgentId), Opt.some(now), Opt.none(), Opt.none(), Opt.none(), Opt.some(1));
1056       if (result.isEmpty()) {
1057         return Opt.none();
1058       }
1059       return Opt.some(getEventMediaPackage(result.get(0).getMediaPackageId()));
1060     } catch (Exception e) {
1061       throw new SchedulerException(e);
1062     }
1063   }
1064 
1065   @Override
1066   public List<MediaPackage> findConflictingEvents(String captureDeviceID, Date startDate, Date endDate)
1067       throws SchedulerException {
1068     try {
1069       final Organization organization = securityService.getOrganization();
1070       final User user = SecurityUtil.createSystemUser(systemUserName, organization);
1071       List<MediaPackage> conflictingEvents = new ArrayList<>();
1072 
1073       SecurityUtil.runAs(securityService, organization, user, () -> {
1074         try {
1075           persistence.getEvents(captureDeviceID, startDate, endDate, Util.EVENT_MINIMUM_SEPARATION_MILLISECONDS)
1076                   .stream()
1077                   .map(id -> getEventMediaPackage(id, false))
1078                   .forEach(conflictingEvents::add);
1079         } catch (SchedulerServiceDatabaseException e) {
1080           logger.error("Failed to get conflicting events", e);
1081         }
1082       });
1083 
1084       return conflictingEvents;
1085 
1086     } catch (Exception e) {
1087       throw new SchedulerException(e);
1088     }
1089   }
1090 
1091   @Override
1092   public List<MediaPackage> findConflictingEvents(String captureAgentId, RRule rrule, Date start, Date end,
1093           long duration, TimeZone tz) throws SchedulerException {
1094     notEmpty(captureAgentId, "captureAgentId");
1095     notNull(rrule, "rrule");
1096     notNull(start, "start");
1097     notNull(end, "end");
1098     notNull(tz, "timeZone");
1099 
1100     Util.adjustRrule(rrule, start, tz);
1101     final List<Period> periods =  Util.calculatePeriods(start, end, duration, rrule, tz);
1102 
1103     if (periods.isEmpty()) {
1104       return Collections.emptyList();
1105     }
1106 
1107     return findConflictingEvents(periods, captureAgentId, tz);
1108   }
1109 
1110   private boolean checkPeriodOverlap(final List<Period> periods) {
1111     final List<Period> sortedPeriods = new ArrayList<>(periods);
1112     sortedPeriods.sort(Comparator.comparing(Period::getStart));
1113     Period prior = periods.get(0);
1114     for (Period current : periods.subList(1, periods.size())) {
1115       if (current.getStart().compareTo(prior.getEnd()) < 0) {
1116         return true;
1117       }
1118       prior = current;
1119     }
1120     return false;
1121   }
1122 
1123   private List<MediaPackage> findConflictingEvents(List<Period> periods, String captureAgentId, TimeZone tz)
1124           throws SchedulerException {
1125     notEmpty(captureAgentId, "captureAgentId");
1126     notNull(periods, "periods");
1127     requireTrue(periods.size() > 0, "periods");
1128 
1129     // First, check if there are overlaps inside the periods to be added (this is possible if you specify an RRULE via
1130     // the external API, for example; the admin ui should prevent this from happening). Then check for conflicts with
1131     // existing events.
1132     if (checkPeriodOverlap(periods)) {
1133       throw new IllegalArgumentException("RRULE periods overlap");
1134     }
1135 
1136     try {
1137       TimeZoneRegistry registry = TimeZoneRegistryFactory.getInstance().createRegistry();
1138 
1139       Set<MediaPackage> events = new HashSet<>();
1140 
1141       for (Period event : periods) {
1142         event.setTimeZone(registry.getTimeZone(tz.getID()));
1143         final Date startDate = event.getStart();
1144         final Date endDate = event.getEnd();
1145 
1146         events.addAll(findConflictingEvents(captureAgentId, startDate, endDate));
1147       }
1148 
1149       return new ArrayList<>(events);
1150     } catch (Exception e) {
1151       throw new SchedulerException(e);
1152     }
1153   }
1154 
1155   @Override
1156   public String getCalendar(Opt<String> captureAgentId, Opt<String> seriesId, Opt<Date> cutoff)
1157           throws SchedulerException {
1158 
1159     try {
1160       final Map<String, ExtendedEventDto> searchResult = persistence.search(captureAgentId, Opt.none(), cutoff,
1161           Opt.some(DateTime.now().minusHours(1).toDate()), Opt.none(), Opt.none()).stream()
1162           .collect(Collectors.toMap(ExtendedEventDto::getMediaPackageId, Function.identity()));
1163       final AQueryBuilder query = assetManager.createQuery();
1164       final AResult result = query.select(query.snapshot())
1165           .where(withOrganization(query).and(query.mediaPackageIds(searchResult.keySet().toArray(new String[0])))
1166               .and(withOwner(query)).and(query.version().isLatest()))
1167           .run();
1168 
1169       final CalendarGenerator cal = new CalendarGenerator(seriesService);
1170       for (final ARecord record : result.getRecords()) {
1171         final Optional<MediaPackage> optMp = record.getSnapshot().isPresent()
1172             ? Optional.of(record.getSnapshot().get().getMediaPackage())
1173             : Optional.empty();
1174 
1175         // If the event media package is empty, skip the event
1176         if (optMp.isEmpty()) {
1177           logger.warn("Mediapackage for event '{}' can't be found, event is not recorded", record.getMediaPackageId());
1178           continue;
1179         }
1180 
1181         if (seriesId.isSome() && !seriesId.get().equals(optMp.get().getSeries())) {
1182           continue;
1183         }
1184 
1185         Opt<DublinCoreCatalog> catalogOpt = loadEpisodeDublinCoreFromAsset(record.getSnapshot().get());
1186         if (catalogOpt.isNone()) {
1187           logger.warn("No episode catalog available, skipping!");
1188           continue;
1189         }
1190 
1191         final Map<String, String> caMetadata = deserializeExtendedEventProperties(searchResult.get(record.getMediaPackageId()).getCaptureAgentProperties());
1192 
1193         // If the even properties are empty, skip the event
1194         if (caMetadata.isEmpty()) {
1195           logger.warn("Properties for event '{}' can't be found, event is not recorded", record.getMediaPackageId());
1196           continue;
1197         }
1198 
1199         final String agentId = searchResult.get(record.getMediaPackageId()).getCaptureAgentId();
1200         final Date start = searchResult.get(record.getMediaPackageId()).getStartDate();
1201         final Date end = searchResult.get(record.getMediaPackageId()).getEndDate();
1202         final Date lastModified = record.getSnapshot().get().getArchivalDate();
1203 
1204         // Add the entry to the calendar, skip it with a warning if adding fails
1205         try {
1206           cal.addEvent(optMp.get(), catalogOpt.get(), agentId, start, end, lastModified, toPropertyString(caMetadata));
1207         } catch (Exception e) {
1208           logger.warn("Error adding event '{}' to calendar, event is not recorded", record.getMediaPackageId(), e);
1209         }
1210       }
1211 
1212       // Only validate calendars with events. Without any events, the iCalendar won't validate
1213       if (cal.getCalendar().getComponents().size() > 0) {
1214         cal.getCalendar().validate();
1215       }
1216 
1217       return cal.getCalendar().toString();
1218 
1219     } catch (Exception e) {
1220       throw new SchedulerException(e);
1221     }
1222   }
1223 
1224   @Override
1225   public String getScheduleLastModified(String captureAgentId) throws SchedulerException {
1226     notEmpty(captureAgentId, "captureAgentId");
1227 
1228     try {
1229       String lastModified = lastModifiedCache.getIfPresent(captureAgentId);
1230       if (lastModified != null)
1231         return lastModified;
1232 
1233       populateLastModifiedCache();
1234 
1235       lastModified = lastModifiedCache.getIfPresent(captureAgentId);
1236 
1237       // If still null set the empty calendar ETag
1238       if (lastModified == null) {
1239         lastModified = EMPTY_CALENDAR_ETAG;
1240         lastModifiedCache.put(captureAgentId, lastModified);
1241       }
1242       return lastModified;
1243     } catch (Exception e) {
1244       throw new SchedulerException(e);
1245     }
1246   }
1247 
1248   @Override
1249   public void removeScheduledRecordingsBeforeBuffer(long buffer) throws SchedulerException {
1250     DateTime end = new DateTime(DateTimeZone.UTC).minus(buffer * 1000);
1251 
1252     logger.info("Starting to look for scheduled recordings that have finished before {}.",
1253             DateTimeSupport.toUTC(end.getMillis()));
1254 
1255     List<ExtendedEventDto> finishedEvents;
1256     try {
1257       finishedEvents = persistence.search(Opt.<String> none(), Opt.<Date> none(), Opt.<Date> none(), Opt.<Date> none(),
1258               Opt.some(end.toDate()), Opt.none());
1259       logger.debug("Found {} events from search.", finishedEvents.size());
1260     } catch (Exception e) {
1261       throw new SchedulerException(e);
1262     }
1263 
1264     int recordingsRemoved = 0;
1265     for (ExtendedEventDto extEvt : finishedEvents) {
1266       final String eventId = extEvt.getMediaPackageId();
1267       try {
1268         removeEvent(eventId);
1269         logger.debug("Sucessfully removed scheduled event with id " + eventId);
1270         recordingsRemoved++;
1271       } catch (NotFoundException e) {
1272         logger.debug("Skipping event with id {} because it is not found", eventId);
1273       } catch (Exception e) {
1274         logger.warn("Unable to delete event with id '{}':", eventId, e);
1275       }
1276     }
1277 
1278     logger.info("Found {} to remove that ended before {}.", recordingsRemoved, DateTimeSupport.toUTC(end.getMillis()));
1279   }
1280 
1281   @Override
1282   public boolean updateRecordingState(String id, String state) throws NotFoundException, SchedulerException {
1283     notEmpty(id, "id");
1284     notEmpty(state, "state");
1285 
1286     if (!RecordingState.KNOWN_STATES.contains(state)) {
1287       logger.warn("Invalid recording state: {}.", state);
1288       return false;
1289     }
1290 
1291     try {
1292       final Opt<ExtendedEventDto> optExtEvt = persistence.getEvent(id);
1293 
1294       if (optExtEvt.isNone())
1295         throw new NotFoundException();
1296 
1297       final String prevRecordingState = optExtEvt.get().getRecordingState();
1298       final Recording r = new RecordingImpl(id, state);
1299       if (!state.equals(prevRecordingState)) {
1300         logger.debug("Setting Recording {} to state {}.", id, state);
1301 
1302         // Update live event
1303         sendSchedulerUpdate(new SchedulerItemList(r.getID(), Collections.singletonList(SchedulerItem
1304                 .updateRecordingStatus(r.getState(), r.getLastCheckinTime()))));
1305 
1306         // Update Elasticsearch index
1307         updateEventInIndex(r.getID(), Opt.none(), Opt.none(), Opt.none(), Opt.none(), Opt.none(),
1308                 Opt.none(), Opt.none(), Opt.some(r.getState()));
1309       } else {
1310         logger.debug("Recording state not changed");
1311       }
1312 
1313       persistence.storeEvent(
1314           id,
1315           securityService.getOrganization().getId(),
1316           Opt.none(),
1317           Opt.none(),
1318           Opt.none(),
1319           Opt.none(),
1320           Opt.some(r.getState()),
1321           Opt.some(r.getLastCheckinTime()),
1322           Opt.none(),
1323           Opt.none(),
1324           Opt.none(),
1325           Opt.none(),
1326           Opt.none()
1327       );
1328       return true;
1329     } catch (NotFoundException e) {
1330       throw e;
1331     } catch (Exception e) {
1332       throw new SchedulerException(e);
1333     }
1334   }
1335 
1336   @Override
1337   public Recording getRecordingState(String id) throws NotFoundException, SchedulerException {
1338 
1339     notEmpty(id, "id");
1340 
1341     try {
1342       Opt<ExtendedEventDto> extEvt = persistence.getEvent(id);
1343 
1344       if (extEvt.isNone() || extEvt.get().getRecordingState() == null) {
1345         throw new NotFoundException();
1346       }
1347 
1348       return new RecordingImpl(id, extEvt.get().getRecordingState(), extEvt.get().getRecordingLastHeard());
1349     } catch (NotFoundException e) {
1350       throw e;
1351     } catch (Exception e) {
1352       throw new SchedulerException(e);
1353     }
1354   }
1355 
1356   @Override
1357   public void removeRecording(String id) throws NotFoundException, SchedulerException {
1358     notEmpty(id, "id");
1359 
1360     try {
1361       persistence.resetRecordingState(id);
1362 
1363       // Update live event
1364       sendSchedulerUpdate(new SchedulerItemList(id, SchedulerItem.deleteRecordingState()));
1365 
1366       // Update Elasticsearch index
1367       removeRecordingStatusFromIndex(id);
1368     } catch (NotFoundException e) {
1369       throw e;
1370     } catch (Exception e) {
1371       throw new SchedulerException(e);
1372     }
1373   }
1374 
1375   @Override
1376   public Map<String, Recording> getKnownRecordings() throws SchedulerException {
1377     try {
1378       return persistence.getKnownRecordings().parallelStream()
1379           .collect(
1380               Collectors.toMap(ExtendedEventDto::getMediaPackageId,
1381               dto -> new RecordingImpl(dto.getMediaPackageId(), dto.getRecordingState(), dto.getRecordingLastHeard()))
1382           );
1383     } catch (Exception e) {
1384       throw new SchedulerException(e);
1385     }
1386   }
1387 
1388   private synchronized void persistEvent(final String mpId, final String checksum,
1389           final Opt<Date> startDateTime, final Opt<Date> endDateTime, final Opt<String> captureAgentId,
1390           final Opt<Set<String>> userIds, final Opt<MediaPackage> mediaPackage,
1391           final Opt<Map<String, String>> wfProperties, final Opt<Map<String, String>> caProperties,
1392           final Opt<String> schedulingSource) throws SchedulerServiceDatabaseException {
1393     // Store scheduled mediapackage
1394     for (MediaPackage mpToUpdate : mediaPackage) {
1395       assetManager.takeSnapshot(SNAPSHOT_OWNER, mpToUpdate);
1396     }
1397 
1398     // Store extended event
1399     persistence.storeEvent(
1400         mpId,
1401         securityService.getOrganization().getId(),
1402         captureAgentId,
1403         startDateTime,
1404         endDateTime,
1405         schedulingSource,
1406         Opt.none(),
1407         Opt.none(),
1408         userIds.isSome() ? Opt.some(String.join(",", userIds.get())) : Opt.none(),
1409         Opt.some(new Date()),
1410         Opt.some(checksum),
1411         wfProperties,
1412         caProperties
1413     );
1414   }
1415 
1416   /**
1417    * Update the event in the Elasticsearch index. Fields will only be updated of the corresponding Opt is not none.
1418    *
1419    * @param mediaPackageId
1420    * @param index
1421    * @param acl
1422    * @param dublinCore
1423    * @param startTime
1424    * @param endTime
1425    * @param presenters
1426    * @param agentId
1427    * @param properties
1428    * @param recordingStatus
1429    */
1430   private void updateEventInIndex(String mediaPackageId, Opt<AccessControlList> acl,
1431           Opt<DublinCoreCatalog> dublinCore, Opt<Date> startTime, Opt<Date> endTime, Opt<Set<String>> presenters,
1432           Opt<String> agentId, Opt<Map<String, String>> properties, Opt<String> recordingStatus) {
1433 
1434     String organization = getSecurityService().getOrganization().getId();
1435     User user = getSecurityService().getUser();
1436 
1437     Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
1438       Event event = eventOpt.orElse(new Event(mediaPackageId, organization));
1439 
1440       if (acl.isSome()) {
1441         event.setAccessPolicy(AccessControlParser.toJsonSilent(acl.get()));
1442       }
1443       if (dublinCore.isSome()) {
1444         EventIndexUtils.updateEvent(event, dublinCore.get());
1445         if (isBlank(event.getCreator()))
1446           event.setCreator(getSecurityService().getUser().getName());
1447 
1448         // Update series name if not already done
1449         try {
1450           EventIndexUtils.updateSeriesName(event, organization, user, index);
1451         } catch (SearchIndexException e) {
1452           logger.error("Error updating the series name of the event {} in the {} index.", mediaPackageId,
1453                   index.getIndexName(), e);
1454         }
1455       }
1456       if (presenters.isSome()) {
1457         event.setTechnicalPresenters(new ArrayList<>(presenters.get()));
1458       }
1459       if (agentId.isSome()) {
1460         event.setAgentId(agentId.get());
1461       }
1462       if (recordingStatus.isSome() && !recordingStatus.get().equals(RecordingState.UNKNOWN)) {
1463         event.setRecordingStatus(recordingStatus.get());
1464       }
1465       if (properties.isSome()) {
1466         event.setAgentConfiguration(properties.get());
1467       }
1468       if (startTime.isSome()) {
1469         String startTimeStr = startTime == null ? null : DateTimeSupport.toUTC(startTime.get().getTime());
1470         event.setTechnicalStartTime(startTimeStr);
1471       }
1472       if (endTime.isSome()) {
1473         String endTimeStr = endTime == null ? null : DateTimeSupport.toUTC(endTime.get().getTime());
1474         event.setTechnicalEndTime(endTimeStr);
1475       }
1476 
1477       return Optional.of(event);
1478     };
1479 
1480     try {
1481       index.addOrUpdateEvent(mediaPackageId, updateFunction, organization, user);
1482       logger.debug("Scheduled event {} updated in the {} index.", mediaPackageId, index.getIndexName());
1483     } catch (SearchIndexException e) {
1484       logger.error("Error updating the scheduled event {} in the {} index.", mediaPackageId, index.getIndexName(), e);
1485     }
1486   }
1487 
1488   /**
1489    * Set recording status to null for this event in the Elasticsearch index.
1490    *
1491    * @param mediaPackageId
1492    * @param index
1493    */
1494   private void removeRecordingStatusFromIndex(String mediaPackageId) {
1495     String organization = getSecurityService().getOrganization().getId();
1496     User user = getSecurityService().getUser();
1497 
1498     Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
1499       Event event = eventOpt.orElse(new Event(mediaPackageId, organization));
1500       event.setRecordingStatus(null);
1501       return Optional.of(event);
1502     };
1503 
1504     try {
1505       index.addOrUpdateEvent(mediaPackageId, updateFunction, organization, user);
1506       logger.debug("Recording state of event {} removed from the {} index.", mediaPackageId, index.getIndexName());
1507     } catch (SearchIndexException e) {
1508       logger.error("Failed to remove the recording state of event {} from the {} index.", mediaPackageId,
1509               index.getIndexName(), e);
1510     }
1511   }
1512 
1513   /**
1514    * Remove scheduling information for this event from the Elasticsearch index.
1515    *
1516    * @param mediaPackageId
1517    * @param index
1518    */
1519   private void removeSchedulingInfoFromIndex(String mediaPackageId) {
1520     String orgId = getSecurityService().getOrganization().getId();
1521 
1522     try {
1523       index.deleteEvent(mediaPackageId, orgId);
1524       logger.debug("Scheduling information of event {} removed from the {} index.", mediaPackageId,
1525               index.getIndexName());
1526     } catch (SearchIndexException e) {
1527       logger.error("Failed to delete the scheduling information of event {} from the {} index.", mediaPackageId,
1528               index.getIndexName(), e);
1529     }
1530   }
1531 
1532   /**
1533    * Send messages to trigger an update in the LiveScheduleService.
1534    *
1535    * @param mpId
1536    * @param acl
1537    * @param dublinCore
1538    * @param startTime
1539    * @param endTime
1540    * @param agentId
1541    * @param properties
1542    */
1543   private void updateLiveEvent(String mpId, Opt<AccessControlList> acl, Opt<DublinCoreCatalog> dublinCore,
1544           Opt<Date> startTime, Opt<Date> endTime, Opt<String> agentId, Opt<Map<String, String>> properties) {
1545     List<SchedulerItem> items = new ArrayList<>();
1546     if (acl.isSome()) {
1547       items.add(SchedulerItem.updateAcl(acl.get()));
1548     }
1549     if (dublinCore.isSome()) {
1550       items.add(SchedulerItem.updateCatalog(dublinCore.get()));
1551     }
1552     if (startTime.isSome()) {
1553       items.add(SchedulerItem.updateStart(startTime.get()));
1554     }
1555     if (endTime.isSome()) {
1556       items.add(SchedulerItem.updateEnd(endTime.get()));
1557     }
1558     if (agentId.isSome()) {
1559       items.add(SchedulerItem.updateAgent(agentId.get()));
1560     }
1561     if (properties.isSome()) {
1562       items.add(SchedulerItem.updateProperties(properties.get()));
1563     }
1564 
1565     if (!items.isEmpty()) {
1566       sendSchedulerUpdate(new SchedulerItemList(mpId, items));
1567     }
1568   }
1569 
1570   private Map<String, String> getFinalAgentProperties(Map<String, String> caMetadata, Map<String, String> wfProperties,
1571           String captureAgentId, Opt<String> seriesId, Opt<DublinCoreCatalog> dublinCore) {
1572     Map<String, String> properties = new HashMap<>();
1573     for (Entry<String, String> entry : caMetadata.entrySet()) {
1574       if (entry.getKey().startsWith(WORKFLOW_CONFIG_PREFIX))
1575         continue;
1576       properties.put(entry.getKey(), entry.getValue());
1577     }
1578     for (Entry<String, String> entry : wfProperties.entrySet()) {
1579       properties.put(WORKFLOW_CONFIG_PREFIX.concat(entry.getKey()), entry.getValue());
1580     }
1581     if (dublinCore.isSome()) {
1582       properties.put("event.title", dublinCore.get().getFirst(DublinCore.PROPERTY_TITLE));
1583     }
1584     if (seriesId.isSome()) {
1585       properties.put("event.series", seriesId.get());
1586     }
1587     properties.put("event.location", captureAgentId);
1588     return properties;
1589   }
1590 
1591   private void touchLastEntry(String captureAgentId) throws SchedulerException {
1592     // touch last entry
1593     try {
1594       logger.debug("Marking calendar feed for {} as modified", captureAgentId);
1595       persistence.touchLastEntry(captureAgentId);
1596       populateLastModifiedCache();
1597     } catch (SchedulerServiceDatabaseException e) {
1598       logger.error("Failed to update last modified entry of agent '{}':", captureAgentId, e);
1599     }
1600   }
1601 
1602   private void populateLastModifiedCache() throws SchedulerException {
1603     try {
1604       Map<String, Date> lastModifiedDates = persistence.getLastModifiedDates();
1605       for (Entry<String, Date> entry : lastModifiedDates.entrySet()) {
1606         Date lastModifiedDate = entry.getValue() != null ? entry.getValue() : new Date();
1607         lastModifiedCache.put(entry.getKey(), generateLastModifiedHash(lastModifiedDate));
1608       }
1609     } catch (Exception e) {
1610       throw new SchedulerException(e);
1611     }
1612   }
1613 
1614   private String generateLastModifiedHash(Date lastModifiedDate) {
1615     return "mod" + Long.toString(lastModifiedDate.getTime());
1616   }
1617 
1618   private String toPropertyString(Map<String, String> properties) {
1619     StringBuilder wfPropertiesString = new StringBuilder();
1620     for (Map.Entry<String, String> entry : properties.entrySet())
1621       wfPropertiesString.append(entry.getKey() + "=" + entry.getValue() + "\n");
1622     return wfPropertiesString.toString();
1623   }
1624 
1625   private MediaPackage getEventMediaPackage(final String mediaPackageId, boolean checkOwner) {
1626     AQueryBuilder query = assetManager.createQuery();
1627     var predicate = withOrganization(query)
1628             .and(query.mediaPackageId(mediaPackageId))
1629             .and(query.version().isLatest());
1630     if (checkOwner) {
1631       predicate = predicate.and(withOwner(query));
1632     }
1633 
1634     Optional<ARecord> record = query.select(query.snapshot()).where(predicate).run().getRecords().stream().findFirst();
1635     if (record.isEmpty())
1636       throw new RuntimeNotFoundException(new NotFoundException());
1637 
1638     return record.get().getSnapshot().get().getMediaPackage();
1639   }
1640 
1641   private MediaPackage getEventMediaPackage(final String mediaPackageId) {
1642     return getEventMediaPackage(mediaPackageId, true);
1643   }
1644 
1645   /**
1646    *
1647    * @param mp
1648    *          the mediapackage to update
1649    * @param dc
1650    *          the dublincore metadata to use to update the mediapackage
1651    * @return the updated mediapackage
1652    * @throws IOException
1653    *           Thrown if an IO error occurred adding the dc catalog file
1654    * @throws MediaPackageException
1655    *           Thrown if an error occurred updating the mediapackage or the mediapackage does not contain a catalog
1656    */
1657   private MediaPackage updateDublincCoreCatalog(MediaPackage mp, DublinCoreCatalog dc)
1658           throws IOException, MediaPackageException {
1659     try (InputStream inputStream = IOUtils.toInputStream(dc.toXmlString(), "UTF-8")) {
1660       // Update dublincore catalog
1661       Catalog[] catalogs = mp.getCatalogs(MediaPackageElements.EPISODE);
1662       if (catalogs.length > 0) {
1663         Catalog catalog = catalogs[0];
1664         URI uri = workspace.put(mp.getIdentifier().toString(), catalog.getIdentifier(), "dublincore.xml", inputStream);
1665         catalog.setURI(uri);
1666         // setting the URI to a new source so the checksum will most like be invalid
1667         catalog.setChecksum(null);
1668       } else {
1669         throw new MediaPackageException("Unable to find catalog");
1670       }
1671     }
1672     return mp;
1673   }
1674 
1675   private TechnicalMetadata getTechnicalMetadata(ExtendedEventDto extEvt) {
1676     final String agentId = extEvt.getCaptureAgentId();
1677     final Date start = extEvt.getStartDate();
1678     final Date end = extEvt.getEndDate();
1679     final Set<String> presenters = getPresenters(Opt.nul(extEvt.getPresenters()).getOr(""));
1680     final Opt<String> recordingStatus = Opt.nul(extEvt.getRecordingState());
1681     final Opt<Long> lastHeard = Opt.nul(extEvt.getRecordingLastHeard());
1682     final Map<String, String> caMetadata = deserializeExtendedEventProperties(extEvt.getCaptureAgentProperties());
1683     final Map<String, String> wfProperties = deserializeExtendedEventProperties(extEvt.getWorkflowProperties());
1684 
1685     Recording recording = null;
1686     if (recordingStatus.isSome() && lastHeard.isSome())
1687       recording = new RecordingImpl(extEvt.getMediaPackageId(), recordingStatus.get(), lastHeard.get());
1688 
1689     return new TechnicalMetadataImpl(extEvt.getMediaPackageId(), agentId, start, end, presenters, wfProperties,
1690             caMetadata, Opt.nul(recording));
1691   }
1692 
1693   private Predicate withOrganization(AQueryBuilder query) {
1694     return query.organizationId().eq(securityService.getOrganization().getId());
1695   }
1696 
1697   private Predicate withOwner(AQueryBuilder query) {
1698     return query.owner().eq(SNAPSHOT_OWNER);
1699   }
1700 
1701   private Set<String> getPresenters(String presentersString) {
1702     return new HashSet<>(Arrays.asList(StringUtils.split(presentersString, ",")));
1703   }
1704 
1705   /**
1706    * @return A {@link List} of {@link MediaPackageElementFlavor} that provide the extended metadata to the front end.
1707    */
1708   private List<MediaPackageElementFlavor> getEventCatalogUIAdapterFlavors() {
1709     String organization = securityService.getOrganization().getId();
1710     return Stream.$(eventCatalogUIAdapters).filter(eventOrganizationFilter._2(organization)).map(uiAdapterToFlavor)
1711             .filter(isNotEpisodeDublinCore).toList();
1712   }
1713 
1714   @Override
1715   public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
1716     try {
1717       final int total;
1718       try {
1719         total = persistence.countEvents();
1720       } catch (SchedulerServiceDatabaseException e) {
1721         logIndexRebuildError(logger, e);
1722         throw new IndexRebuildException(getService(), e);
1723       }
1724       logIndexRebuildBegin(logger, total, "scheduled events");
1725       final int[] current = {0};
1726       int n = 20;
1727       var updatedEventRange = new ArrayList<Event>();
1728 
1729       for (Organization organization: orgDirectoryService.getOrganizations()) {
1730         final User user = SecurityUtil.createSystemUser(systemUserName, organization);
1731         SecurityUtil.runAs(securityService, organization, user,
1732                 () -> {
1733                   final List<ExtendedEventDto> events;
1734                   try {
1735                     events = persistence.getEvents();
1736                   } catch (SchedulerServiceDatabaseException e) {
1737                     logIndexRebuildError(logger, e, organization);
1738                     return;
1739                   }
1740 
1741                   for (ExtendedEventDto event : events) {
1742                     try {
1743                       current[0]++;
1744 
1745                       var updatedEventData = Optional.of(new Event(event.getMediaPackageId(), organization.getId()));
1746                       updatedEventData = getEventUpdateFunction(event, organization.getId(),
1747                                   securityService.getUser()).apply(updatedEventData);
1748                       updatedEventRange.add(updatedEventData.get());
1749 
1750                       if (updatedEventRange.size() >= n || current[0] >= events.size()) {
1751                         index.bulkEventUpdate(updatedEventRange);
1752                         logIndexRebuildProgress(logger, total, current[0], n);
1753                         updatedEventRange.clear();
1754                       }
1755                     } catch (SearchIndexException e) {
1756                       logger.error("Error while updating event '{}' from search index:", event.getMediaPackageId(), e);
1757                     } catch (Exception e) {
1758                       throw new RuntimeException("Fatal error while indexing event " + event.getMediaPackageId(), e);
1759                     }
1760                   }
1761                });
1762       }
1763     } catch (Exception e) {
1764       logIndexRebuildError(logger, e);
1765       throw new IndexRebuildException(getService(), e);
1766     }
1767   }
1768 
1769   @Override
1770   public IndexRebuildService.Service getService() {
1771     return IndexRebuildService.Service.Scheduler;
1772   }
1773 
1774   public SecurityService getSecurityService() {
1775     return securityService;
1776   }
1777   /**
1778    * Get the function to update a scheduled event in the Elasticsearch index.
1779    *
1780    * @param scheduledEvent
1781    *          The theme to update
1782    * @param orgId
1783    *          The id of the current organization
1784    * @param user
1785    *          The user
1786    * @return the function to do the update
1787    */
1788   private Function<Optional<Event>, Optional<Event>> getEventUpdateFunction(ExtendedEventDto scheduledEvent,
1789           String orgId, User user) {
1790     return (Optional<Event> eventOpt) -> {
1791       Event event = eventOpt.orElse(new Event(scheduledEvent.getMediaPackageId(), orgId));
1792       final Set<String> presenters = getPresenters(Opt.nul(scheduledEvent.getPresenters()).getOr(""));
1793       final Map<String, String> caMetadata = deserializeExtendedEventProperties(scheduledEvent.
1794               getCaptureAgentProperties());
1795       AQueryBuilder query = assetManager.createQuery();
1796       final AResult result = query.select(query.snapshot())
1797               .where(query.mediaPackageId(scheduledEvent.getMediaPackageId()).and(query.version().isLatest())).run();
1798       final Snapshot snapshot = result.getRecords().stream().findFirst().get().getSnapshot().get();
1799 
1800       Opt<AccessControlList> acl = Opt.some(authorizationService.getActiveAcl(snapshot.getMediaPackage()).getA());
1801       Opt<DublinCoreCatalog> dublinCore = loadEpisodeDublinCoreFromAsset(snapshot);
1802       Opt<Date> startTime = Opt.some(scheduledEvent.getStartDate());
1803       Opt<Date> endTime = Opt.some(scheduledEvent.getEndDate());
1804       Opt<Set<String>> presentersOpt = Opt.some(presenters);
1805       Opt<String> agentId = Opt.some(scheduledEvent.getCaptureAgentId());
1806       Opt<Map<String, String>> properties = Opt.some(caMetadata);
1807       Opt<String> recordingStatus = Opt.nul(scheduledEvent.getRecordingState());
1808 
1809       if (acl.isSome()) {
1810         event.setAccessPolicy(AccessControlParser.toJsonSilent(acl.get()));
1811       }
1812       if (dublinCore.isSome()) {
1813         EventIndexUtils.updateEvent(event, dublinCore.get());
1814         if (isBlank(event.getCreator()))
1815           event.setCreator(getSecurityService().getUser().getName());
1816 
1817         // Update series name if not already done
1818         try {
1819           EventIndexUtils.updateSeriesName(event, orgId, user, index);
1820         } catch (SearchIndexException e) {
1821           logger.error("Error updating the series name of the event {} in the {} index.",
1822                   scheduledEvent.getMediaPackageId(), index.getIndexName(), e);
1823         }
1824       }
1825       if (presentersOpt.isSome()) {
1826         event.setTechnicalPresenters(new ArrayList<>(presentersOpt.get()));
1827       }
1828       if (agentId.isSome()) {
1829         event.setAgentId(agentId.get());
1830       }
1831       if (recordingStatus.isSome() && !recordingStatus.get().equals(RecordingState.UNKNOWN)) {
1832         event.setRecordingStatus(recordingStatus.get());
1833       }
1834       if (properties.isSome()) {
1835         event.setAgentConfiguration(properties.get());
1836       }
1837       if (startTime.isSome()) {
1838         String startTimeStr = startTime == null ? null : DateTimeSupport.toUTC(startTime.get().getTime());
1839         event.setTechnicalStartTime(startTimeStr);
1840       }
1841       if (endTime.isSome()) {
1842         String endTimeStr = endTime == null ? null : DateTimeSupport.toUTC(endTime.get().getTime());
1843         event.setTechnicalEndTime(endTimeStr);
1844       }
1845 
1846       return Optional.of(event);
1847     };
1848   }
1849 }