View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.scheduler.impl;
22  
23  import static org.apache.commons.lang3.StringUtils.isBlank;
24  import static org.opencastproject.scheduler.impl.SchedulerUtil.calculateChecksum;
25  import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
26  import static org.opencastproject.util.EqualsUtil.ne;
27  import static org.opencastproject.util.RequireUtil.notEmpty;
28  import static org.opencastproject.util.RequireUtil.notNull;
29  import static org.opencastproject.util.RequireUtil.requireTrue;
30  import static org.opencastproject.util.data.Monadics.mlist;
31  
32  import org.opencastproject.assetmanager.api.Asset;
33  import org.opencastproject.assetmanager.api.AssetManager;
34  import org.opencastproject.assetmanager.api.Availability;
35  import org.opencastproject.assetmanager.api.Snapshot;
36  import org.opencastproject.assetmanager.api.query.AQueryBuilder;
37  import org.opencastproject.assetmanager.api.query.ARecord;
38  import org.opencastproject.assetmanager.api.query.AResult;
39  import org.opencastproject.assetmanager.api.query.ASelectQuery;
40  import org.opencastproject.assetmanager.api.query.Predicate;
41  import org.opencastproject.elasticsearch.api.SearchIndexException;
42  import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
43  import org.opencastproject.elasticsearch.index.objects.event.Event;
44  import org.opencastproject.elasticsearch.index.objects.event.EventIndexUtils;
45  import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
46  import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
47  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
48  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
49  import org.opencastproject.mediapackage.Catalog;
50  import org.opencastproject.mediapackage.MediaPackage;
51  import org.opencastproject.mediapackage.MediaPackageElement;
52  import org.opencastproject.mediapackage.MediaPackageElementFlavor;
53  import org.opencastproject.mediapackage.MediaPackageElements;
54  import org.opencastproject.mediapackage.MediaPackageException;
55  import org.opencastproject.mediapackage.MediaPackageSupport;
56  import org.opencastproject.mediapackage.identifier.Id;
57  import org.opencastproject.mediapackage.identifier.IdImpl;
58  import org.opencastproject.message.broker.api.scheduler.SchedulerItem;
59  import org.opencastproject.message.broker.api.scheduler.SchedulerItemList;
60  import org.opencastproject.message.broker.api.update.SchedulerUpdateHandler;
61  import org.opencastproject.metadata.dublincore.DCMIPeriod;
62  import org.opencastproject.metadata.dublincore.DublinCore;
63  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
64  import org.opencastproject.metadata.dublincore.DublinCoreUtil;
65  import org.opencastproject.metadata.dublincore.DublinCoreValue;
66  import org.opencastproject.metadata.dublincore.DublinCores;
67  import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
68  import org.opencastproject.metadata.dublincore.EventCatalogUIAdapter;
69  import org.opencastproject.metadata.dublincore.Precision;
70  import org.opencastproject.scheduler.api.Recording;
71  import org.opencastproject.scheduler.api.RecordingImpl;
72  import org.opencastproject.scheduler.api.RecordingState;
73  import org.opencastproject.scheduler.api.SchedulerConflictException;
74  import org.opencastproject.scheduler.api.SchedulerException;
75  import org.opencastproject.scheduler.api.SchedulerService;
76  import org.opencastproject.scheduler.api.TechnicalMetadata;
77  import org.opencastproject.scheduler.api.TechnicalMetadataImpl;
78  import org.opencastproject.scheduler.api.Util;
79  import org.opencastproject.scheduler.impl.persistence.ExtendedEventDto;
80  import org.opencastproject.security.api.AccessControlList;
81  import org.opencastproject.security.api.AccessControlParser;
82  import org.opencastproject.security.api.AccessControlUtil;
83  import org.opencastproject.security.api.AuthorizationService;
84  import org.opencastproject.security.api.Organization;
85  import org.opencastproject.security.api.OrganizationDirectoryService;
86  import org.opencastproject.security.api.SecurityService;
87  import org.opencastproject.security.api.UnauthorizedException;
88  import org.opencastproject.security.api.User;
89  import org.opencastproject.security.util.SecurityUtil;
90  import org.opencastproject.series.api.SeriesService;
91  import org.opencastproject.util.DateTimeSupport;
92  import org.opencastproject.util.NotFoundException;
93  import org.opencastproject.util.OsgiUtil;
94  import org.opencastproject.util.XmlNamespaceBinding;
95  import org.opencastproject.util.XmlNamespaceContext;
96  import org.opencastproject.util.data.Option;
97  import org.opencastproject.util.data.functions.Misc;
98  import org.opencastproject.util.data.functions.Strings;
99  import org.opencastproject.workspace.api.Workspace;
100 
101 import com.google.common.cache.Cache;
102 import com.google.common.cache.CacheBuilder;
103 import com.google.gson.Gson;
104 import com.google.gson.reflect.TypeToken;
105 
106 import net.fortuna.ical4j.model.Period;
107 import net.fortuna.ical4j.model.TimeZoneRegistry;
108 import net.fortuna.ical4j.model.TimeZoneRegistryFactory;
109 import net.fortuna.ical4j.model.property.RRule;
110 
111 import org.apache.commons.io.IOUtils;
112 import org.apache.commons.lang3.StringUtils;
113 import org.joda.time.DateTime;
114 import org.joda.time.DateTimeZone;
115 import org.osgi.service.cm.ConfigurationException;
116 import org.osgi.service.cm.ManagedService;
117 import org.osgi.service.component.ComponentContext;
118 import org.osgi.service.component.annotations.Activate;
119 import org.osgi.service.component.annotations.Component;
120 import org.osgi.service.component.annotations.Reference;
121 import org.osgi.service.component.annotations.ReferenceCardinality;
122 import org.osgi.service.component.annotations.ReferencePolicy;
123 import org.osgi.service.component.annotations.ReferencePolicyOption;
124 import org.slf4j.Logger;
125 import org.slf4j.LoggerFactory;
126 
127 import java.io.IOException;
128 import java.io.InputStream;
129 import java.lang.reflect.Type;
130 import java.net.URI;
131 import java.util.ArrayList;
132 import java.util.Arrays;
133 import java.util.Calendar;
134 import java.util.Collections;
135 import java.util.Comparator;
136 import java.util.Date;
137 import java.util.Dictionary;
138 import java.util.HashMap;
139 import java.util.HashSet;
140 import java.util.LinkedList;
141 import java.util.List;
142 import java.util.Map;
143 import java.util.Map.Entry;
144 import java.util.Optional;
145 import java.util.Set;
146 import java.util.TimeZone;
147 import java.util.UUID;
148 import java.util.concurrent.ConcurrentHashMap;
149 import java.util.concurrent.CopyOnWriteArrayList;
150 import java.util.concurrent.TimeUnit;
151 import java.util.function.Function;
152 import java.util.stream.Collectors;
153 
154 /**
155  * Implementation of {@link SchedulerService}.
156  */
157 @Component(
158     service = { ManagedService.class, SchedulerService.class, IndexProducer.class },
159     property = {
160         "service.description=Scheduler Service"
161     }
162 )
163 public class SchedulerServiceImpl extends AbstractIndexProducer implements SchedulerService, ManagedService {
164 
165   /** The logger */
166   private static final Logger logger = LoggerFactory.getLogger(SchedulerServiceImpl.class);
167 
168   /** The last modified cache configuration key */
169   private static final String CFG_KEY_LAST_MODIFIED_CACHE_EXPIRE = "last_modified_cache_expire";
170 
171   /** The maintenance configuration key */
172   private static final String CFG_KEY_MAINTENANCE = "maintenance";
173 
174   /** The default cache expire time in seconds */
175   private static final int DEFAULT_CACHE_EXPIRE = 60;
176 
177   /** The Etag for an empty calendar */
178   private static final String EMPTY_CALENDAR_ETAG = "mod0";
179 
180   private static final String SNAPSHOT_OWNER = SchedulerService.JOB_TYPE;
181 
182   private static final Gson gson = new Gson();
183   /**
184    * Deserializes properties stored in string columns of the extended event table
185    * @param props Properties as retrieved from the DB
186    * @return deserialized key-value pairs
187    */
188   private static Map<String,String> deserializeExtendedEventProperties(String props) {
189     if (props == null || props.trim().isEmpty()) {
190       return new HashMap<>();
191     }
192     Type type = new TypeToken<Map<String, String>>() { }.getType();
193     return gson.fromJson(props, type);
194   }
195 
196   /** The last modified cache */
197   protected Cache<String, String> lastModifiedCache = CacheBuilder.newBuilder()
198           .expireAfterWrite(DEFAULT_CACHE_EXPIRE, TimeUnit.SECONDS).build();
199 
200   /** Persistent storage for events */
201   private SchedulerServiceDatabase persistence;
202 
203   /** The series service */
204   private SeriesService seriesService;
205 
206   /** The security service used to run the security context with. */
207   private SecurityService securityService;
208 
209   /** The asset manager */
210   private AssetManager assetManager;
211 
212   /** The workspace */
213   private Workspace workspace;
214 
215   /** The authorization service */
216   private AuthorizationService authorizationService;
217 
218   /** The organization directory service */
219   private OrganizationDirectoryService orgDirectoryService;
220 
221   /** The Elasticsearch indices */
222   private ElasticsearchIndex index;
223 
224   /** The list of registered event catalog UI adapters */
225   private List<EventCatalogUIAdapter> eventCatalogUIAdapters = new ArrayList<>();
226   private final List<SchedulerUpdateHandler> schedulerUpdateHandlers = new CopyOnWriteArrayList<>();
227 
228   /** The system user name */
229   private String systemUserName;
230 
231   private ComponentContext componentContext;
232 
233   /**
234    * OSGi callback to add an update handler.
235    *
236    * @param handler
237    */
238   @Reference(
239       cardinality = ReferenceCardinality.MULTIPLE,
240       policy = ReferencePolicy.DYNAMIC,
241       policyOption = ReferencePolicyOption.GREEDY,
242       unbind = "removeSchedulerUpdateHandler"
243   )
244   public void addSchedulerUpdateHandler(SchedulerUpdateHandler handler) {
245     this.schedulerUpdateHandlers.add(handler);
246   }
247 
248   public void removeSchedulerUpdateHandler(SchedulerUpdateHandler handler) {
249     this.schedulerUpdateHandlers.remove(handler);
250   }
251 
252   /**
253    * OSGi callback to set Persistence Service.
254    *
255    * @param persistence
256    */
257   @Reference
258   public void setPersistence(SchedulerServiceDatabase persistence) {
259     this.persistence = persistence;
260   }
261 
262   /**
263    * OSGi callback for setting Series Service.
264    *
265    * @param seriesService
266    */
267   @Reference
268   public void setSeriesService(SeriesService seriesService) {
269     this.seriesService = seriesService;
270   }
271 
272   /**
273    * OSGi callback to set security service.
274    *
275    * @param securityService
276    */
277   @Reference
278   public void setSecurityService(SecurityService securityService) {
279     this.securityService = securityService;
280   }
281 
282   /**
283    * OSGi callback to set the asset manager.
284    *
285    * @param assetManager
286    */
287   @Reference
288   public void setAssetManager(AssetManager assetManager) {
289     this.assetManager = assetManager;
290   }
291 
292   /**
293    * OSGi callback to set the workspace.
294    *
295    * @param workspace
296    */
297   @Reference
298   public void setWorkspace(Workspace workspace) {
299     this.workspace = workspace;
300   }
301 
302   /**
303    * OSGi callback to set the authorization service.
304    *
305    * @param authorizationService
306    */
307   @Reference
308   public void setAuthorizationService(AuthorizationService authorizationService) {
309     this.authorizationService = authorizationService;
310   }
311 
312   /**
313    * Update all of the handlers that an event has changed
314    *
315    * @param list The list of scheduler changes for a mediapackage
316    */
317   private void sendSchedulerUpdate(SchedulerItemList list) {
318     while (schedulerUpdateHandlers.size() != 1) {
319       logger.warn("Expecting 1 handler, but {} are registered.  Waiting 10s then retrying...",
320           schedulerUpdateHandlers.size());
321       try {
322         Thread.sleep(10000L);
323       } catch (InterruptedException e) { /* swallow this, nothing to do */ }
324     }
325     String mpid = list.getId();
326     for (SchedulerItem item : list.getItems()) {
327       for (SchedulerUpdateHandler handler : this.schedulerUpdateHandlers) {
328         handler.execute(mpid, item);
329       }
330     }
331   }
332 
333   /**
334    * OSGi callback to set the organization directory service.
335    *
336    * @param orgDirectoryService
337    */
338   @Reference
339   public void setOrgDirectoryService(OrganizationDirectoryService orgDirectoryService) {
340     this.orgDirectoryService = orgDirectoryService;
341   }
342 
343   /**
344    * OSgi callback to set the Elasticsearch index.
345    *
346    * @param index
347    *          the Elasticsearch index.
348    */
349   @Reference
350   public void setIndex(ElasticsearchIndex index) {
351     this.index = index;
352   }
353 
354   /** OSGi callback to add {@link EventCatalogUIAdapter} instance. */
355   @Reference(
356       cardinality = ReferenceCardinality.MULTIPLE,
357       policy = ReferencePolicy.DYNAMIC,
358       policyOption = ReferencePolicyOption.GREEDY,
359       unbind = "removeCatalogUIAdapter"
360   )
361   public void addCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
362     eventCatalogUIAdapters.add(catalogUIAdapter);
363   }
364 
365   /** OSGi callback to remove {@link EventCatalogUIAdapter} instance. */
366   public void removeCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
367     eventCatalogUIAdapters.remove(catalogUIAdapter);
368   }
369 
370   /**
371    * Activates Scheduler Service.
372    *
373    * @param cc
374    *          ComponentContext
375    * @throws Exception
376    */
377   @Activate
378   public void activate(ComponentContext cc) throws Exception {
379     this.componentContext = cc;
380     systemUserName = SecurityUtil.getSystemUserName(cc);
381     logger.info("Activating Scheduler Service");
382   }
383 
384   @Override
385   public void updated(Dictionary<String, ?> properties) throws ConfigurationException {
386     if (properties != null) {
387       final Option<Integer> cacheExpireDuration = OsgiUtil.getOptCfg(properties, CFG_KEY_LAST_MODIFIED_CACHE_EXPIRE)
388               .bind(Strings.toInt);
389       if (cacheExpireDuration.isSome()) {
390         lastModifiedCache = CacheBuilder.newBuilder().expireAfterWrite(cacheExpireDuration.get(), TimeUnit.SECONDS)
391                 .build();
392         logger.info("Set last modified cache to {}", DateTimeSupport.humanReadableTime(cacheExpireDuration.get()));
393       } else {
394         logger.info("Set last modified cache to default {}", DateTimeSupport.humanReadableTime(DEFAULT_CACHE_EXPIRE));
395       }
396       final Option<Boolean> maintenance = OsgiUtil.getOptCfgAsBoolean(properties, CFG_KEY_MAINTENANCE);
397       if (maintenance.getOrElse(false)) {
398         final String name = SchedulerServiceImpl.class.getName();
399         logger.warn("Putting scheduler into maintenance mode. This only makes sense when migrating data. If this is not"
400                 + " intended, edit the config file '{}.cfg' accordingly and restart opencast.", name);
401         componentContext.disableComponent(name);
402       }
403     }
404   }
405 
406   @Override
407   public void addEvent(Date startDateTime, Date endDateTime, String captureAgentId, Set<String> userIds,
408           MediaPackage mediaPackage, Map<String, String> wfProperties, Map<String, String> caMetadata,
409           Optional<String> schedulingSource)
410                   throws UnauthorizedException, SchedulerException {
411     addEventInternal(startDateTime, endDateTime, captureAgentId, userIds, mediaPackage, wfProperties, caMetadata,
412             schedulingSource);
413   }
414 
415   private void addEventInternal(Date startDateTime, Date endDateTime, String captureAgentId, Set<String> userIds,
416           MediaPackage mediaPackage, Map<String, String> wfProperties, Map<String, String> caMetadata,
417           Optional<String> schedulingSource)
418                   throws SchedulerException {
419     notNull(startDateTime, "startDateTime");
420     notNull(endDateTime, "endDateTime");
421     notEmpty(captureAgentId, "captureAgentId");
422     notNull(userIds, "userIds");
423     notNull(mediaPackage, "mediaPackage");
424     notNull(wfProperties, "wfProperties");
425     notNull(caMetadata, "caMetadata");
426     notNull(schedulingSource, "schedulingSource");
427     if (endDateTime.before(startDateTime)) {
428       throw new IllegalArgumentException("The end date is before the start date");
429     }
430 
431     final String mediaPackageId = mediaPackage.getIdentifier().toString();
432 
433     try {
434       AQueryBuilder query = assetManager.createQuery();
435       AResult result = query.select(query.nothing())
436               .where(withOrganization(query).and(query.mediaPackageId(mediaPackageId).and(query.version().isLatest())))
437               .run();
438       Optional<ARecord> record = result.getRecords().stream().findFirst();
439       if (record.isPresent()) {
440         logger.warn("Mediapackage with id '{}' already exists!", mediaPackageId);
441         throw new SchedulerConflictException("Mediapackage with id '" + mediaPackageId + "' already exists!");
442       }
443 
444       Optional<String> seriesId = Optional.ofNullable(StringUtils.trimToNull(mediaPackage.getSeries()));
445 
446       List<MediaPackage> conflictingEvents = findConflictingEvents(captureAgentId, startDateTime, endDateTime);
447       if (conflictingEvents.size() > 0) {
448         logger.info("Unable to add event {}, conflicting events found: {}", mediaPackageId, conflictingEvents);
449         throw new SchedulerConflictException(
450                 "Unable to add event, conflicting events found for event " + mediaPackageId);
451       }
452 
453       // Load dublincore and acl for update
454       Optional<DublinCoreCatalog> dublinCore = DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage);
455       AccessControlList acl = authorizationService.getActiveAcl(mediaPackage).getA();
456 
457       // Get updated agent properties
458       Map<String, String> finalCaProperties = getFinalAgentProperties(caMetadata, wfProperties, captureAgentId,
459               seriesId, dublinCore);
460 
461       // Persist asset
462       String checksum = calculateChecksum(workspace, getEventCatalogUIAdapterFlavors(), startDateTime, endDateTime,
463                                           captureAgentId, userIds, mediaPackage, dublinCore, wfProperties,
464                                           finalCaProperties, acl);
465       persistEvent(mediaPackageId, checksum, Optional.of(startDateTime), Optional.of(endDateTime),
466               Optional.of(captureAgentId), Optional.of(userIds), Optional.of(mediaPackage), Optional.of(wfProperties),
467               Optional.of(finalCaProperties), schedulingSource);
468 
469       // Update live event
470       updateLiveEvent(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime),
471               Optional.of(endDateTime), Optional.of(captureAgentId), Optional.of(finalCaProperties));
472 
473       // Update Elasticsearch index
474       updateEventInIndex(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime),
475           Optional.of(endDateTime), Optional.of(userIds), Optional.of(captureAgentId), Optional.of(finalCaProperties),
476           Optional.empty());
477 
478       // Update last modified
479       touchLastEntry(captureAgentId);
480     } catch (SchedulerException e) {
481       throw e;
482     } catch (Exception e) {
483       logger.error("Failed to create event with id '{}':", mediaPackageId, e);
484       throw new SchedulerException(e);
485     }
486   }
487 
488   @Override
489   public Map<String, Period> addMultipleEvents(RRule rRule, Date start, Date end, Long duration, TimeZone tz,
490           String captureAgentId, Set<String> userIds, MediaPackage templateMp, Map<String, String> wfProperties,
491           Map<String, String> caMetadata, Optional<String> schedulingSource)
492           throws UnauthorizedException, SchedulerConflictException, SchedulerException {
493     // input Rrule is UTC. Needs to be adjusted to tz
494     Util.adjustRrule(rRule, start, tz);
495     List<Period> periods = Util.calculatePeriods(start, end, duration, rRule, tz);
496     if (periods.isEmpty()) {
497       return Collections.emptyMap();
498     }
499     return addMultipleEventInternal(periods, captureAgentId, userIds, templateMp, wfProperties, caMetadata,
500             schedulingSource);
501   }
502 
503   private Map<String, Period> addMultipleEventInternal(List<Period> periods, String captureAgentId,
504           Set<String> userIds, MediaPackage templateMp, Map<String, String> wfProperties,
505           Map<String, String> caMetadata, Optional<String> schedulingSource) throws SchedulerException {
506     notNull(periods, "periods");
507     requireTrue(periods.size() > 0, "periods");
508     notEmpty(captureAgentId, "captureAgentId");
509     notNull(userIds, "userIds");
510     notNull(templateMp, "mediaPackages");
511     notNull(wfProperties, "wfProperties");
512     notNull(caMetadata, "caMetadata");
513     notNull(schedulingSource, "schedulingSource");
514 
515     Map<String, Period> scheduledEvents = new ConcurrentHashMap<>();
516 
517     try {
518       LinkedList<Id> ids = new LinkedList<>();
519       AQueryBuilder qb = assetManager.createQuery();
520       Predicate p = null;
521       //While we don't have a list of IDs equal to the number of periods
522       while (ids.size() <= periods.size()) {
523         //Create a list of IDs equal to the number of periods, along with a set of AM predicates
524         while (ids.size() <= periods.size()) {
525           Id id = new IdImpl(UUID.randomUUID().toString());
526           ids.add(id);
527           Predicate np = qb.mediaPackageId(id.toString());
528           //Haha, p = np jokes with the AM query language. Ha. Haha. Ha.  (Sob...)
529           if (null == p) {
530             p = np;
531           } else {
532             p = p.or(np);
533           }
534         }
535         //Select the list of ids which alread exist.  Hint: this needs to be zero
536         AResult result = qb.select(qb.nothing()).where(withOrganization(qb).and(p).and(qb.version().isLatest())).run();
537         //If there is conflict, clear the list and start over
538         if (result.getTotalSize() > 0) {
539           ids.clear();
540         }
541       }
542 
543       Optional<String> seriesId = Optional.ofNullable(StringUtils.trimToNull(templateMp.getSeries()));
544 
545       List<MediaPackage> conflictingEvents = findConflictingEvents(periods, captureAgentId, TimeZone.getDefault());
546       if (conflictingEvents.size() > 0) {
547         logger.info("Unable to add events, conflicting events found: {}", conflictingEvents);
548         throw new SchedulerConflictException("Unable to add event, conflicting events found");
549       }
550 
551       final Organization org = securityService.getOrganization();
552       final User user = securityService.getUser();
553       periods.parallelStream().forEach(event -> SecurityUtil.runAs(securityService, org, user, () -> {
554         final int currentCounter = periods.indexOf(event);
555         MediaPackage mediaPackage = (MediaPackage) templateMp.clone();
556         Date startDate = new Date(event.getStart().getTime());
557         Date endDate = new Date(event.getEnd().getTime());
558         Id id = ids.get(currentCounter);
559 
560         //Get, or make, the DC catalog
561         DublinCoreCatalog dc;
562         Optional<DublinCoreCatalog> dcOpt = DublinCoreUtil.loadEpisodeDublinCore(workspace, templateMp);
563         if (dcOpt.isPresent()) {
564           dc = dcOpt.get();
565           dc = (DublinCoreCatalog) dc.clone();
566           // make sure to bind the OC_PROPERTY namespace
567           dc.addBindings(XmlNamespaceContext
568                   .mk(XmlNamespaceBinding.mk(DublinCores.OC_PROPERTY_NS_PREFIX, DublinCores.OC_PROPERTY_NS_URI)));
569         } else {
570           dc = DublinCores.mkOpencastEpisode().getCatalog();
571         }
572 
573         // Set the new media package identifier
574         mediaPackage.setIdentifier(id);
575 
576         // Update dublincore title and temporal
577         String newTitle = dc.getFirst(DublinCore.PROPERTY_TITLE)
578             + String.format(" %0" + Integer.toString(periods.size()).length() + "d", currentCounter + 1);
579         dc.set(DublinCore.PROPERTY_TITLE, newTitle);
580         DublinCoreValue eventTime = EncodingSchemeUtils.encodePeriod(new DCMIPeriod(startDate, endDate),
581                 Precision.Second);
582         dc.set(DublinCore.PROPERTY_TEMPORAL, eventTime);
583         dc.set(DublinCore.PROPERTY_CREATED, EncodingSchemeUtils.encodeDate(startDate, Precision.Second));
584         try {
585           mediaPackage = updateDublincCoreCatalog(mediaPackage, dc);
586         } catch (Exception e) {
587           Misc.chuck(e);
588         }
589         mediaPackage.setTitle(newTitle);
590 
591         String mediaPackageId = mediaPackage.getIdentifier().toString();
592         //Converting from iCal4j DateTime objects to plain Date objects to prevent AMQ issues below
593         Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
594         cal.setTime(event.getStart());
595         Date startDateTime = cal.getTime();
596         cal.setTime(event.getEnd());
597         Date endDateTime = cal.getTime();
598         // Load dublincore and acl for update
599         Optional<DublinCoreCatalog> dublinCore = DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage);
600         AccessControlList acl = authorizationService.getActiveAcl(mediaPackage).getA();
601 
602         // Get updated agent properties
603         Map<String, String> finalCaProperties = getFinalAgentProperties(caMetadata, wfProperties, captureAgentId,
604                 seriesId, dublinCore);
605 
606         // Persist asset
607         String checksum = calculateChecksum(workspace, getEventCatalogUIAdapterFlavors(), startDateTime, endDateTime,
608                 captureAgentId, userIds, mediaPackage, dublinCore, wfProperties, finalCaProperties, acl);
609         try {
610           persistEvent(mediaPackageId, checksum, Optional.of(startDateTime), Optional.of(endDateTime),
611                 Optional.of(captureAgentId), Optional.of(userIds), Optional.of(mediaPackage), Optional.of(wfProperties),
612                 Optional.of(finalCaProperties), schedulingSource);
613         } catch (Exception e) {
614           Misc.chuck(e);
615         }
616 
617         // Update live event
618         updateLiveEvent(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime),
619                 Optional.of(endDateTime), Optional.of(captureAgentId), Optional.of(finalCaProperties));
620 
621         // Update Elasticsearch index
622         updateEventInIndex(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime),
623                 Optional.of(endDateTime), Optional.of(userIds), Optional.of(captureAgentId),
624                 Optional.of(finalCaProperties), Optional.empty());
625 
626         scheduledEvents.put(mediaPackageId, event);
627         for (MediaPackageElement mediaPackageElement : mediaPackage.getElements()) {
628           try {
629             workspace.delete(mediaPackage.getIdentifier().toString(), mediaPackageElement.getIdentifier());
630           } catch (NotFoundException | IOException e) {
631             logger.warn("Failed to delete media package element", e);
632           }
633         }
634       }));
635       return scheduledEvents;
636     } catch (SchedulerException e) {
637       throw e;
638     } catch (Exception e) {
639       throw new SchedulerException(e);
640     } finally {
641       // Update last modified
642       if (!scheduledEvents.isEmpty()) {
643         touchLastEntry(captureAgentId);
644       }
645     }
646   }
647 
648   @Override
649   public void updateEvent(final String mpId, Optional<Date> startDateTime, Optional<Date> endDateTime,
650           Optional<String> captureAgentId, Optional<Set<String>> userIds, Optional<MediaPackage> mediaPackage,
651           Optional<Map<String, String>> wfProperties, Optional<Map<String, String>> caMetadata)
652                   throws NotFoundException, UnauthorizedException, SchedulerException {
653     updateEventInternal(mpId, startDateTime, endDateTime, captureAgentId, userIds, mediaPackage,
654             wfProperties, caMetadata, false);
655   }
656 
657   @Override
658   public void updateEvent(final String mpId, Optional<Date> startDateTime, Optional<Date> endDateTime,
659           Optional<String> captureAgentId, Optional<Set<String>> userIds, Optional<MediaPackage> mediaPackage,
660           Optional<Map<String, String>> wfProperties, Optional<Map<String, String>> caMetadata, boolean allowConflict)
661                 throws NotFoundException, UnauthorizedException, SchedulerException {
662     updateEventInternal(mpId, startDateTime, endDateTime, captureAgentId, userIds, mediaPackage,
663             wfProperties, caMetadata, allowConflict);
664   }
665 
666   private void updateEventInternal(final String mpId, Optional<Date> startDateTime,
667           Optional<Date> endDateTime, Optional<String> captureAgentId, Optional<Set<String>> userIds,
668           Optional<MediaPackage> mediaPackageOpt, Optional<Map<String, String>> wfProperties,
669           Optional<Map<String, String>> caMetadata, boolean allowConflict)
670                 throws NotFoundException, SchedulerException {
671     notEmpty(mpId, "mpId");
672     notNull(startDateTime, "startDateTime");
673     notNull(endDateTime, "endDateTime");
674     notNull(captureAgentId, "captureAgentId");
675     notNull(userIds, "userIds");
676     notNull(mediaPackageOpt, "mediaPackageOpt");
677     notNull(wfProperties, "wfProperties");
678     notNull(caMetadata, "caMetadata");
679 
680     try {
681       AQueryBuilder query = assetManager.createQuery();
682 
683       ASelectQuery select = query
684               .select(query.snapshot())
685               .where(withOrganization(query).and(query.mediaPackageId(mpId).and(query.version().isLatest())
686                   .and(withOwner(query))));
687       Optional<ARecord> optEvent = select.run().getRecords().stream().findFirst();
688       Optional<ExtendedEventDto> optExtEvent = persistence.getEvent(mpId);
689       if (optEvent.isEmpty() || optExtEvent.isEmpty()) {
690         throw new NotFoundException("No event found while updating event " + mpId);
691       }
692 
693       ARecord record = optEvent.get();
694       if (record.getSnapshot().isEmpty()) {
695         throw new NotFoundException("No mediapackage found while updating event " + mpId);
696       }
697       Snapshot snapshot = record.getSnapshot().get();
698       MediaPackage archivedMediaPackage = snapshot.getMediaPackage();
699 
700       Optional<DublinCoreCatalog> archivedDublinCoreOpt = loadEpisodeDublinCoreFromAsset(snapshot);
701       if (archivedDublinCoreOpt.isEmpty()) {
702         throw new NotFoundException("No dublincore found while updating event " + mpId);
703       }
704       DublinCoreCatalog archivedDublinCore = archivedDublinCoreOpt.get();
705       AccessControlList archivedAcl = authorizationService.getActiveAcl(archivedMediaPackage).getA();
706 
707       final ExtendedEventDto extendedEventDto = optExtEvent.get();
708       Date start = extendedEventDto.getStartDate();
709       Date end = extendedEventDto.getEndDate();
710 
711       if ((startDateTime.isPresent() || endDateTime.isPresent())
712           && endDateTime.orElse(end).before(startDateTime.orElse(start))) {
713         throw new SchedulerException("The end date is before the start date");
714       }
715 
716       String agentId = extendedEventDto.getCaptureAgentId();
717       Optional<String> seriesId = Optional.ofNullable(archivedMediaPackage.getSeries());
718 
719       // Check for conflicting events
720       // Check scheduling conflicts in case a property relevant for conflicts has changed
721       if ((captureAgentId.isPresent() || startDateTime.isPresent() || endDateTime.isPresent())
722             && (!allowConflict || !isAdmin())) {
723         List<MediaPackage> conflictingEvents = findConflictingEvents(
724             captureAgentId.orElse(agentId),
725             startDateTime.orElse(start),
726             endDateTime.orElse(end)
727         ).stream()
728             .filter(mp -> !mpId.equals(mp.getIdentifier().toString()))
729             .collect(Collectors.toList());
730         if (conflictingEvents.size() > 0) {
731           logger.info("Unable to update event {}, conflicting events found: {}", mpId, conflictingEvents);
732           throw new SchedulerConflictException("Unable to update event, conflicting events found for event " + mpId);
733         }
734       }
735 
736       Set<String> presenters = getPresenters(Optional.ofNullable(extendedEventDto.getPresenters()).orElse(""));
737       Map<String, String> wfProps = deserializeExtendedEventProperties(extendedEventDto.getWorkflowProperties());
738       Map<String, String> caProperties = deserializeExtendedEventProperties(
739               extendedEventDto.getCaptureAgentProperties());
740 
741       boolean propertiesChanged = false;
742       boolean dublinCoreChanged = false;
743 
744       // Get workflow properties
745       if (wfProperties.isPresent()) {
746         propertiesChanged = true;
747         wfProps = wfProperties.get();
748       }
749 
750       // Get capture agent properties
751       if (caMetadata.isPresent()) {
752         propertiesChanged = true;
753         caProperties = caMetadata.get();
754       }
755 
756       if (captureAgentId.isPresent()) {
757         propertiesChanged = true;
758       }
759 
760       Optional<AccessControlList> changedAclOpt = Optional.empty();
761       Optional<DublinCoreCatalog> changedDublinCoreOpt = Optional.empty();
762       if (mediaPackageOpt.isPresent()) {
763         MediaPackage mediaPackage = mediaPackageOpt.get();
764         // Check for series change
765         if (ne(archivedMediaPackage.getSeries(), mediaPackage.getSeries())) {
766           propertiesChanged = true;
767           seriesId = Optional.ofNullable(mediaPackage.getSeries());
768         }
769 
770         // Check for ACL change
771         AccessControlList acl = authorizationService.getActiveAcl(mediaPackage).getA();
772         if (!AccessControlUtil.equals(acl, archivedAcl)) {
773           changedAclOpt = Optional.of(acl);
774         }
775 
776         // Check for dublin core change
777         Optional<DublinCoreCatalog> dublinCoreOpt = DublinCoreUtil.loadEpisodeDublinCore(workspace,
778                 mediaPackage);
779         if (dublinCoreOpt.isPresent() && !DublinCoreUtil.equals(archivedDublinCore, dublinCoreOpt.get())) {
780           dublinCoreChanged = true;
781           propertiesChanged = true;
782           changedDublinCoreOpt = dublinCoreOpt;
783         }
784       }
785 
786       //update metadata for dublincore
787       DublinCoreCatalog dublinCore = changedDublinCoreOpt.orElse(archivedDublinCore);
788       DublinCoreCatalog dublinCoreCopy = (DublinCoreCatalog) dublinCore.clone();
789       if (startDateTime.isPresent() && endDateTime.isPresent()) {
790         DublinCoreValue eventTime = EncodingSchemeUtils.encodePeriod(
791                 new DCMIPeriod(startDateTime.get(), endDateTime.get()), Precision.Second);
792         dublinCore.set(DublinCore.PROPERTY_TEMPORAL, eventTime);
793       }
794       if (captureAgentId.isPresent()) {
795         dublinCore.set(DublinCore.PROPERTY_SPATIAL, captureAgentId.get());
796       }
797       if (!DublinCoreUtil.equals(dublinCore, dublinCoreCopy)) {
798         dublinCoreChanged = true;
799         changedDublinCoreOpt = Optional.of(dublinCore);
800         mediaPackageOpt = Optional.of(updateDublincCoreCatalog(mediaPackageOpt.orElse(archivedMediaPackage),
801                 changedDublinCoreOpt.get()));
802       }
803 
804       Optional<Map<String, String>> finalCaProperties = Optional.empty();
805       if (propertiesChanged) {
806         finalCaProperties = Optional.of(getFinalAgentProperties(caProperties, wfProps, captureAgentId.orElse(agentId),
807                                                              seriesId, Optional.of(changedDublinCoreOpt.orElse(
808                                                                      archivedDublinCore))));
809       }
810 
811       String checksum = calculateChecksum(workspace, getEventCatalogUIAdapterFlavors(), startDateTime.orElse(start),
812               endDateTime.orElse(end), captureAgentId.orElse(agentId), userIds.orElse(presenters),
813               mediaPackageOpt.orElse(archivedMediaPackage),
814               Optional.of(changedDublinCoreOpt.orElse(archivedDublinCore)), wfProperties.orElse(wfProps),
815               finalCaProperties.orElse(caProperties), changedAclOpt.orElse(new AccessControlList()));
816 
817       String oldChecksum = extendedEventDto.getChecksum();
818       if (checksum.equals(oldChecksum)) {
819         logger.debug("Updated event {} has same checksum, ignore update", mpId);
820         return;
821       }
822 
823       // Update asset
824       persistEvent(mpId, checksum, startDateTime, endDateTime, captureAgentId, userIds,
825               mediaPackageOpt, wfProperties, finalCaProperties, Optional.empty());
826 
827       // Update live event
828       updateLiveEvent(mpId, changedAclOpt, changedDublinCoreOpt, startDateTime, endDateTime, Optional.of(agentId),
829               finalCaProperties);
830 
831       // Update Elasticsearch index
832       updateEventInIndex(mpId, changedAclOpt, changedDublinCoreOpt, startDateTime, endDateTime, userIds,
833               Optional.of(agentId), finalCaProperties, Optional.empty());
834 
835       // Update last modified
836       if (propertiesChanged || dublinCoreChanged || startDateTime.isPresent() || endDateTime.isPresent()) {
837         touchLastEntry(agentId);
838         if (captureAgentId.isPresent()) {
839           touchLastEntry(captureAgentId.get());
840         }
841       }
842     } catch (NotFoundException | SchedulerException e) {
843       throw e;
844     } catch (Exception e) {
845       throw new SchedulerException(e);
846     }
847   }
848 
849   private boolean isAdmin() {
850     return (securityService.getUser().hasRole(GLOBAL_ADMIN_ROLE)
851             || securityService.getUser().hasRole(securityService.getOrganization().getAdminRole()));
852   }
853 
854   private Optional<DublinCoreCatalog> loadEpisodeDublinCoreFromAsset(Snapshot snapshot) {
855     Option<MediaPackageElement> dcCatalog = mlist(snapshot.getMediaPackage().getElements())
856             .filter(MediaPackageSupport.Filters.isEpisodeDublinCore).headOpt();
857     if (dcCatalog.isNone()) {
858       return Optional.empty();
859     }
860 
861     Optional<Asset> asset = assetManager.getAsset(snapshot.getVersion(),
862             snapshot.getMediaPackage().getIdentifier().toString(), dcCatalog.get().getIdentifier());
863     if (asset.isEmpty()) {
864       return Optional.empty();
865     }
866 
867     if (Availability.OFFLINE.equals(asset.get().getAvailability())) {
868       return Optional.empty();
869     }
870 
871     InputStream inputStream = null;
872     try {
873       inputStream = asset.get().getInputStream();
874       return Optional.of(DublinCores.read(inputStream));
875     } finally {
876       IOUtils.closeQuietly(inputStream);
877     }
878   }
879 
880   @Override
881   public synchronized void removeEvent(String mediaPackageId)
882           throws NotFoundException, SchedulerException {
883     notEmpty(mediaPackageId, "mediaPackageId");
884 
885     boolean notFoundInDatabase = false;
886     boolean notFoundInAssetManager;
887     try {
888       // Remove from database
889       try {
890         Optional<ExtendedEventDto> extEvtOpt = persistence.getEvent(mediaPackageId);
891         if (extEvtOpt.isPresent()) {
892           String agentId = extEvtOpt.get().getCaptureAgentId();
893           persistence.deleteEvent(mediaPackageId);
894           if (StringUtils.isNotEmpty(agentId)) {
895             touchLastEntry(agentId);
896           }
897         } else {
898           notFoundInDatabase = true;
899         }
900       } catch (NotFoundException e) {
901         notFoundInDatabase = true;
902       }
903 
904       // Delete scheduler snapshot
905       AQueryBuilder query = assetManager.createQuery();
906       long deletedSnapshots = query.delete(SNAPSHOT_OWNER, query.snapshot())
907               .where(withOrganization(query).and(query.mediaPackageId(mediaPackageId)))
908               .name("delete episode").run();
909       notFoundInAssetManager = deletedSnapshots == 0;
910 
911       // Update live event
912       sendSchedulerUpdate(new SchedulerItemList(mediaPackageId, SchedulerItem.delete()));
913 
914       // Update Elasticsearch index
915       removeSchedulingInfoFromIndex(mediaPackageId);
916     } catch (Exception e) {
917       logger.error("Could not remove event '{}' from persistent storage", mediaPackageId, e);
918       throw new SchedulerException(e);
919     }
920 
921     if (notFoundInDatabase && notFoundInAssetManager) {
922       throw new NotFoundException();
923     }
924   }
925 
926   @Override
927   public MediaPackage getMediaPackage(String mediaPackageId) throws NotFoundException, SchedulerException {
928     notEmpty(mediaPackageId, "mediaPackageId");
929 
930     try {
931       return getEventMediaPackage(mediaPackageId);
932     } catch (RuntimeNotFoundException e) {
933       throw e.getWrappedException();
934     } catch (Exception e) {
935       logger.error("Failed to get mediapackage of event '{}':", mediaPackageId, e);
936       throw new SchedulerException(e);
937     }
938   }
939 
940   @Override
941   public DublinCoreCatalog getDublinCore(String mediaPackageId) throws NotFoundException, SchedulerException {
942     notEmpty(mediaPackageId, "mediaPackageId");
943 
944     try {
945       AQueryBuilder query = assetManager.createQuery();
946       AResult result = query.select(query.snapshot())
947               .where(withOrganization(query).and(query.mediaPackageId(mediaPackageId)).and(withOwner(query))
948               .and(query.version().isLatest()))
949               .run();
950       Optional<ARecord> record = result.getRecords().stream().findFirst();
951       if (record.isEmpty()) {
952         throw new NotFoundException();
953       }
954 
955       Optional<DublinCoreCatalog> dublinCore = loadEpisodeDublinCoreFromAsset(record.get().getSnapshot().get());
956       if (dublinCore.isEmpty()) {
957         throw new NotFoundException("No dublincore catalog found " + mediaPackageId);
958       }
959 
960       return dublinCore.get();
961     } catch (NotFoundException e) {
962       throw e;
963     } catch (Exception e) {
964       logger.error("Failed to get dublin core catalog of event '{}':", mediaPackageId, e);
965       throw new SchedulerException(e);
966     }
967   }
968 
969   @Override
970   public TechnicalMetadata getTechnicalMetadata(String mediaPackageId)
971           throws NotFoundException, UnauthorizedException, SchedulerException {
972     notEmpty(mediaPackageId, "mediaPackageId");
973 
974     try {
975       final Optional<ExtendedEventDto> extEvt = persistence.getEvent(mediaPackageId);
976       if (extEvt.isEmpty()) {
977         throw new NotFoundException();
978       }
979 
980       return getTechnicalMetadata(extEvt.get());
981     } catch (NotFoundException e) {
982       throw e;
983     } catch (Exception e) {
984       logger.error("Failed to get technical metadata of event '{}':", mediaPackageId, e);
985       throw new SchedulerException(e);
986     }
987   }
988 
989   @Override
990   public Map<String, String> getWorkflowConfig(String mediaPackageId) throws NotFoundException, SchedulerException {
991     notEmpty(mediaPackageId, "mediaPackageId");
992 
993     try {
994       Optional<ExtendedEventDto> record = persistence.getEvent(mediaPackageId);
995       if (record.isEmpty()) {
996         throw new NotFoundException();
997       }
998       return deserializeExtendedEventProperties(record.get().getWorkflowProperties());
999     } catch (NotFoundException e) {
1000       throw e;
1001     } catch (Exception e) {
1002       logger.error("Failed to get workflow configuration of event '{}':", mediaPackageId, e);
1003       throw new SchedulerException(e);
1004     }
1005   }
1006 
1007   @Override
1008   public Map<String, String> getCaptureAgentConfiguration(String mediaPackageId)
1009           throws NotFoundException, SchedulerException {
1010     notEmpty(mediaPackageId, "mediaPackageId");
1011 
1012     try {
1013       Optional<ExtendedEventDto> record = persistence.getEvent(mediaPackageId);
1014       if (record.isEmpty()) {
1015         throw new NotFoundException();
1016       }
1017       return deserializeExtendedEventProperties(record.get().getCaptureAgentProperties());
1018     } catch (NotFoundException e) {
1019       throw e;
1020     } catch (Exception e) {
1021       logger.error("Failed to get capture agent contiguration of event '{}':", mediaPackageId, e);
1022       throw new SchedulerException(e);
1023     }
1024   }
1025 
1026   @Override
1027   public int getEventCount() throws SchedulerException {
1028     try {
1029       return persistence.countEvents();
1030     } catch (Exception e) {
1031       throw new SchedulerException(e);
1032     }
1033   }
1034 
1035   @Override
1036   public List<MediaPackage> search(Optional<String> captureAgentId, Optional<Date> startsFrom, Optional<Date> startsTo,
1037           Optional<Date> endFrom, Optional<Date> endTo) throws SchedulerException {
1038     try {
1039       return persistence.search(captureAgentId, startsFrom, startsTo, endFrom, endTo, Optional.empty()).stream()
1040           .map(ExtendedEventDto::getMediaPackageId)
1041           .map(this::getEventMediaPackage).collect(Collectors.toList());
1042     } catch (Exception e) {
1043       throw new SchedulerException(e);
1044     }
1045   }
1046 
1047   @Override
1048   public Optional<MediaPackage> getCurrentRecording(String captureAgentId) throws SchedulerException {
1049     try {
1050       final Date now = new Date();
1051       List<ExtendedEventDto> result = persistence.search(Optional.of(captureAgentId), Optional.empty(),
1052           Optional.of(now), Optional.of(now), Optional.empty(), Optional.of(1));
1053       if (result.isEmpty()) {
1054         return Optional.empty();
1055       }
1056       return Optional.of(getEventMediaPackage(result.get(0).getMediaPackageId()));
1057     } catch (Exception e) {
1058       throw new SchedulerException(e);
1059     }
1060   }
1061 
1062   @Override
1063   public Optional<MediaPackage> getUpcomingRecording(String captureAgentId) throws SchedulerException {
1064     try {
1065       final Date now = new Date();
1066       List<ExtendedEventDto> result = persistence.search(Optional.of(captureAgentId), Optional.of(now),
1067           Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(1));
1068       if (result.isEmpty()) {
1069         return Optional.empty();
1070       }
1071       return Optional.of(getEventMediaPackage(result.get(0).getMediaPackageId()));
1072     } catch (Exception e) {
1073       throw new SchedulerException(e);
1074     }
1075   }
1076 
1077   @Override
1078   public List<MediaPackage> findConflictingEvents(String captureDeviceID, Date startDate, Date endDate)
1079           throws SchedulerException {
1080     try {
1081       final Organization organization = securityService.getOrganization();
1082       final User user = SecurityUtil.createSystemUser(systemUserName, organization);
1083       List<MediaPackage> conflictingEvents = new ArrayList<>();
1084 
1085       SecurityUtil.runAs(securityService, organization, user, () -> {
1086         try {
1087           persistence.getEvents(captureDeviceID, startDate, endDate, Util.EVENT_MINIMUM_SEPARATION_MILLISECONDS)
1088                   .stream()
1089                   .map(id -> getEventMediaPackage(id, false))
1090                   .forEach(conflictingEvents::add);
1091         } catch (SchedulerServiceDatabaseException e) {
1092           logger.error("Failed to get conflicting events", e);
1093         }
1094       });
1095 
1096       return conflictingEvents;
1097 
1098     } catch (Exception e) {
1099       throw new SchedulerException(e);
1100     }
1101   }
1102 
1103   @Override
1104   public List<MediaPackage> findConflictingEvents(String captureAgentId, RRule rrule, Date start, Date end,
1105           long duration, TimeZone tz) throws SchedulerException {
1106     notEmpty(captureAgentId, "captureAgentId");
1107     notNull(rrule, "rrule");
1108     notNull(start, "start");
1109     notNull(end, "end");
1110     notNull(tz, "timeZone");
1111 
1112     Util.adjustRrule(rrule, start, tz);
1113     final List<Period> periods =  Util.calculatePeriods(start, end, duration, rrule, tz);
1114 
1115     if (periods.isEmpty()) {
1116       return Collections.emptyList();
1117     }
1118 
1119     return findConflictingEvents(periods, captureAgentId, tz);
1120   }
1121 
1122   private boolean checkPeriodOverlap(final List<Period> periods) {
1123     final List<Period> sortedPeriods = new ArrayList<>(periods);
1124     sortedPeriods.sort(Comparator.comparing(Period::getStart));
1125     Period prior = periods.get(0);
1126     for (Period current : periods.subList(1, periods.size())) {
1127       if (current.getStart().compareTo(prior.getEnd()) < 0) {
1128         return true;
1129       }
1130       prior = current;
1131     }
1132     return false;
1133   }
1134 
1135   private List<MediaPackage> findConflictingEvents(List<Period> periods, String captureAgentId, TimeZone tz)
1136           throws SchedulerException {
1137     notEmpty(captureAgentId, "captureAgentId");
1138     notNull(periods, "periods");
1139     requireTrue(periods.size() > 0, "periods");
1140 
1141     // First, check if there are overlaps inside the periods to be added (this is possible if you specify an RRULE via
1142     // the external API, for example; the admin ui should prevent this from happening). Then check for conflicts with
1143     // existing events.
1144     if (checkPeriodOverlap(periods)) {
1145       throw new IllegalArgumentException("RRULE periods overlap");
1146     }
1147 
1148     try {
1149       TimeZoneRegistry registry = TimeZoneRegistryFactory.getInstance().createRegistry();
1150 
1151       Set<MediaPackage> events = new HashSet<>();
1152 
1153       for (Period event : periods) {
1154         event.setTimeZone(registry.getTimeZone(tz.getID()));
1155         final Date startDate = event.getStart();
1156         final Date endDate = event.getEnd();
1157 
1158         events.addAll(findConflictingEvents(captureAgentId, startDate, endDate));
1159       }
1160 
1161       return new ArrayList<>(events);
1162     } catch (Exception e) {
1163       throw new SchedulerException(e);
1164     }
1165   }
1166 
1167   @Override
1168   public String getCalendar(Optional<String> captureAgentId, Optional<String> seriesId, Optional<Date> cutoff)
1169           throws SchedulerException {
1170 
1171     try {
1172       final Map<String, ExtendedEventDto> searchResult = persistence.search(captureAgentId, Optional.empty(), cutoff,
1173           Optional.of(DateTime.now().minusHours(1).toDate()), Optional.empty(), Optional.empty()).stream()
1174           .collect(Collectors.toMap(ExtendedEventDto::getMediaPackageId, Function.identity()));
1175       final AQueryBuilder query = assetManager.createQuery();
1176       final AResult result = query.select(query.snapshot())
1177           .where(withOrganization(query).and(query.mediaPackageIds(searchResult.keySet().toArray(new String[0])))
1178               .and(withOwner(query)).and(query.version().isLatest()))
1179           .run();
1180 
1181       final CalendarGenerator cal = new CalendarGenerator(seriesService);
1182       for (final ARecord record : result.getRecords()) {
1183         final Optional<MediaPackage> optMp = record.getSnapshot().isPresent()
1184             ? Optional.of(record.getSnapshot().get().getMediaPackage())
1185             : Optional.empty();
1186 
1187         // If the event media package is empty, skip the event
1188         if (optMp.isEmpty()) {
1189           logger.warn("Mediapackage for event '{}' can't be found, event is not recorded", record.getMediaPackageId());
1190           continue;
1191         }
1192 
1193         if (seriesId.isPresent() && !seriesId.get().equals(optMp.get().getSeries())) {
1194           continue;
1195         }
1196 
1197         Optional<DublinCoreCatalog> catalogOpt = loadEpisodeDublinCoreFromAsset(record.getSnapshot().get());
1198         if (catalogOpt.isEmpty()) {
1199           logger.warn("No episode catalog available, skipping!");
1200           continue;
1201         }
1202 
1203         final Map<String, String> caMetadata = deserializeExtendedEventProperties(
1204             searchResult.get(record.getMediaPackageId()).getCaptureAgentProperties());
1205 
1206         // If the even properties are empty, skip the event
1207         if (caMetadata.isEmpty()) {
1208           logger.warn("Properties for event '{}' can't be found, event is not recorded", record.getMediaPackageId());
1209           continue;
1210         }
1211 
1212         final String agentId = searchResult.get(record.getMediaPackageId()).getCaptureAgentId();
1213         final Date start = searchResult.get(record.getMediaPackageId()).getStartDate();
1214         final Date end = searchResult.get(record.getMediaPackageId()).getEndDate();
1215         final Date lastModified = record.getSnapshot().get().getArchivalDate();
1216 
1217         // Add the entry to the calendar, skip it with a warning if adding fails
1218         try {
1219           cal.addEvent(optMp.get(), catalogOpt.get(), agentId, start, end, lastModified, toPropertyString(caMetadata));
1220         } catch (Exception e) {
1221           logger.warn("Error adding event '{}' to calendar, event is not recorded", record.getMediaPackageId(), e);
1222         }
1223       }
1224 
1225       // Only validate calendars with events. Without any events, the iCalendar won't validate
1226       if (cal.getCalendar().getComponents().size() > 0) {
1227         cal.getCalendar().validate();
1228       }
1229 
1230       return cal.getCalendar().toString();
1231 
1232     } catch (Exception e) {
1233       throw new SchedulerException(e);
1234     }
1235   }
1236 
1237   @Override
1238   public String getScheduleLastModified(String captureAgentId) throws SchedulerException {
1239     notEmpty(captureAgentId, "captureAgentId");
1240 
1241     try {
1242       String lastModified = lastModifiedCache.getIfPresent(captureAgentId);
1243       if (lastModified != null) {
1244         return lastModified;
1245       }
1246 
1247       populateLastModifiedCache();
1248 
1249       lastModified = lastModifiedCache.getIfPresent(captureAgentId);
1250 
1251       // If still null set the empty calendar ETag
1252       if (lastModified == null) {
1253         lastModified = EMPTY_CALENDAR_ETAG;
1254         lastModifiedCache.put(captureAgentId, lastModified);
1255       }
1256       return lastModified;
1257     } catch (Exception e) {
1258       throw new SchedulerException(e);
1259     }
1260   }
1261 
1262   @Override
1263   public void removeScheduledRecordingsBeforeBuffer(long buffer) throws SchedulerException {
1264     DateTime end = new DateTime(DateTimeZone.UTC).minus(buffer * 1000);
1265 
1266     logger.info("Starting to look for scheduled recordings that have finished before {}.",
1267             DateTimeSupport.toUTC(end.getMillis()));
1268 
1269     List<ExtendedEventDto> finishedEvents;
1270     try {
1271       finishedEvents = persistence.search(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(),
1272               Optional.of(end.toDate()), Optional.empty());
1273       logger.debug("Found {} events from search.", finishedEvents.size());
1274     } catch (Exception e) {
1275       throw new SchedulerException(e);
1276     }
1277 
1278     int recordingsRemoved = 0;
1279     for (ExtendedEventDto extEvt : finishedEvents) {
1280       final String eventId = extEvt.getMediaPackageId();
1281       try {
1282         removeEvent(eventId);
1283         logger.debug("Sucessfully removed scheduled event with id " + eventId);
1284         recordingsRemoved++;
1285       } catch (NotFoundException e) {
1286         logger.debug("Skipping event with id {} because it is not found", eventId);
1287       } catch (Exception e) {
1288         logger.warn("Unable to delete event with id '{}':", eventId, e);
1289       }
1290     }
1291 
1292     logger.info("Found {} to remove that ended before {}.", recordingsRemoved, DateTimeSupport.toUTC(end.getMillis()));
1293   }
1294 
1295   @Override
1296   public boolean updateRecordingState(String id, String state) throws NotFoundException, SchedulerException {
1297     notEmpty(id, "id");
1298     notEmpty(state, "state");
1299 
1300     if (!RecordingState.KNOWN_STATES.contains(state)) {
1301       logger.warn("Invalid recording state: {}.", state);
1302       return false;
1303     }
1304 
1305     try {
1306       final Optional<ExtendedEventDto> optExtEvt = persistence.getEvent(id);
1307 
1308       if (optExtEvt.isEmpty()) {
1309         throw new NotFoundException();
1310       }
1311 
1312       final String prevRecordingState = optExtEvt.get().getRecordingState();
1313       final Recording r = new RecordingImpl(id, state);
1314       if (!state.equals(prevRecordingState)) {
1315         logger.debug("Setting Recording {} to state {}.", id, state);
1316 
1317         // Update live event
1318         sendSchedulerUpdate(new SchedulerItemList(r.getID(), Collections.singletonList(SchedulerItem
1319                 .updateRecordingStatus(r.getState(), r.getLastCheckinTime()))));
1320 
1321         // Update Elasticsearch index
1322         updateEventInIndex(r.getID(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(),
1323                 Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(r.getState()));
1324       } else {
1325         logger.debug("Recording state not changed");
1326       }
1327 
1328       persistence.storeEvent(
1329           id,
1330           securityService.getOrganization().getId(),
1331           Optional.empty(),
1332           Optional.empty(),
1333           Optional.empty(),
1334           Optional.empty(),
1335           Optional.of(r.getState()),
1336           Optional.of(r.getLastCheckinTime()),
1337           Optional.empty(),
1338           Optional.empty(),
1339           Optional.empty(),
1340           Optional.empty(),
1341           Optional.empty()
1342       );
1343       return true;
1344     } catch (NotFoundException e) {
1345       throw e;
1346     } catch (Exception e) {
1347       throw new SchedulerException(e);
1348     }
1349   }
1350 
1351   @Override
1352   public Recording getRecordingState(String id) throws NotFoundException, SchedulerException {
1353 
1354     notEmpty(id, "id");
1355 
1356     try {
1357       Optional<ExtendedEventDto> extEvt = persistence.getEvent(id);
1358 
1359       if (extEvt.isEmpty() || extEvt.get().getRecordingState() == null) {
1360         throw new NotFoundException();
1361       }
1362 
1363       return new RecordingImpl(id, extEvt.get().getRecordingState(), extEvt.get().getRecordingLastHeard());
1364     } catch (NotFoundException e) {
1365       throw e;
1366     } catch (Exception e) {
1367       throw new SchedulerException(e);
1368     }
1369   }
1370 
1371   @Override
1372   public void removeRecording(String id) throws NotFoundException, SchedulerException {
1373     notEmpty(id, "id");
1374 
1375     try {
1376       persistence.resetRecordingState(id);
1377 
1378       // Update live event
1379       sendSchedulerUpdate(new SchedulerItemList(id, SchedulerItem.deleteRecordingState()));
1380 
1381       // Update Elasticsearch index
1382       removeRecordingStatusFromIndex(id);
1383     } catch (NotFoundException e) {
1384       throw e;
1385     } catch (Exception e) {
1386       throw new SchedulerException(e);
1387     }
1388   }
1389 
1390   @Override
1391   public Map<String, Recording> getKnownRecordings() throws SchedulerException {
1392     try {
1393       return persistence.getKnownRecordings().parallelStream()
1394           .collect(
1395               Collectors.toMap(ExtendedEventDto::getMediaPackageId,
1396                 dto -> new RecordingImpl(dto.getMediaPackageId(), dto.getRecordingState(), dto.getRecordingLastHeard()))
1397           );
1398     } catch (Exception e) {
1399       throw new SchedulerException(e);
1400     }
1401   }
1402 
1403   private synchronized void persistEvent(final String mpId, final String checksum,
1404           final Optional<Date> startDateTime, final Optional<Date> endDateTime, final Optional<String> captureAgentId,
1405           final Optional<Set<String>> userIds, final Optional<MediaPackage> mediaPackage,
1406           final Optional<Map<String, String>> wfProperties, final Optional<Map<String, String>> caProperties,
1407           final Optional<String> schedulingSource) throws SchedulerServiceDatabaseException {
1408     // Store scheduled mediapackage
1409     if (mediaPackage.isPresent()) {
1410       assetManager.takeSnapshot(SNAPSHOT_OWNER, mediaPackage.get());
1411     }
1412 
1413     // Store extended event
1414     persistence.storeEvent(
1415         mpId,
1416         securityService.getOrganization().getId(),
1417         captureAgentId,
1418         startDateTime,
1419         endDateTime,
1420         schedulingSource,
1421         Optional.empty(),
1422         Optional.empty(),
1423         userIds.isPresent() ? Optional.of(String.join(",", userIds.get())) : Optional.empty(),
1424         Optional.of(new Date()),
1425         Optional.of(checksum),
1426         wfProperties,
1427         caProperties
1428     );
1429   }
1430 
1431   /**
1432    * Update the event in the Elasticsearch index. Fields will only be updated of the corresponding Opt is not none.
1433    *
1434    * @param mediaPackageId
1435    * @param acl
1436    * @param dublinCore
1437    * @param startTime
1438    * @param endTime
1439    * @param presenters
1440    * @param agentId
1441    * @param properties
1442    * @param recordingStatus
1443    */
1444   private void updateEventInIndex(String mediaPackageId, Optional<AccessControlList> acl,
1445           Optional<DublinCoreCatalog> dublinCore, Optional<Date> startTime, Optional<Date> endTime,
1446           Optional<Set<String>> presenters, Optional<String> agentId, Optional<Map<String, String>> properties,
1447           Optional<String> recordingStatus) {
1448 
1449     String organization = getSecurityService().getOrganization().getId();
1450     User user = getSecurityService().getUser();
1451 
1452     Function<Optional<Event>, Optional<Event>> updateFunction = getEventUpdateFunction(mediaPackageId, acl, dublinCore,
1453             startTime, endTime, presenters, agentId, properties, recordingStatus, organization, user);
1454 
1455     try {
1456       index.addOrUpdateEvent(mediaPackageId, updateFunction, organization, user);
1457       logger.debug("Scheduled event {} updated in the {} index.", mediaPackageId, index.getIndexName());
1458     } catch (SearchIndexException e) {
1459       logger.error("Error updating the scheduled event {} in the {} index.", mediaPackageId, index.getIndexName(), e);
1460     }
1461   }
1462 
1463   /**
1464    * Set recording status to null for this event in the Elasticsearch index.
1465    *
1466    * @param mediaPackageId
1467    */
1468   private void removeRecordingStatusFromIndex(String mediaPackageId) {
1469     String organization = getSecurityService().getOrganization().getId();
1470     User user = getSecurityService().getUser();
1471 
1472     Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
1473       Event event = eventOpt.orElse(new Event(mediaPackageId, organization));
1474       event.setRecordingStatus(null);
1475       return Optional.of(event);
1476     };
1477 
1478     try {
1479       index.addOrUpdateEvent(mediaPackageId, updateFunction, organization, user);
1480       logger.debug("Recording state of event {} removed from the {} index.", mediaPackageId, index.getIndexName());
1481     } catch (SearchIndexException e) {
1482       logger.error("Failed to remove the recording state of event {} from the {} index.", mediaPackageId,
1483               index.getIndexName(), e);
1484     }
1485   }
1486 
1487   /**
1488    * Remove scheduling information for this event from the Elasticsearch index.
1489    *
1490    * @param mediaPackageId
1491    */
1492   private void removeSchedulingInfoFromIndex(String mediaPackageId) {
1493     String orgId = getSecurityService().getOrganization().getId();
1494 
1495     try {
1496       index.deleteEvent(mediaPackageId, orgId);
1497       logger.debug("Scheduling information of event {} removed from the {} index.", mediaPackageId,
1498               index.getIndexName());
1499     } catch (SearchIndexException e) {
1500       logger.error("Failed to delete the scheduling information of event {} from the {} index.", mediaPackageId,
1501               index.getIndexName(), e);
1502     }
1503   }
1504 
1505   /**
1506    * Send messages to trigger an update in the LiveScheduleService.
1507    *
1508    * @param mpId
1509    * @param acl
1510    * @param dublinCore
1511    * @param startTime
1512    * @param endTime
1513    * @param agentId
1514    * @param properties
1515    */
1516   private void updateLiveEvent(String mpId, Optional<AccessControlList> acl, Optional<DublinCoreCatalog> dublinCore,
1517           Optional<Date> startTime, Optional<Date> endTime, Optional<String> agentId,
1518           Optional<Map<String, String>> properties) {
1519     List<SchedulerItem> items = new ArrayList<>();
1520     if (acl.isPresent()) {
1521       items.add(SchedulerItem.updateAcl(acl.get()));
1522     }
1523     if (dublinCore.isPresent()) {
1524       items.add(SchedulerItem.updateCatalog(dublinCore.get()));
1525     }
1526     if (startTime.isPresent()) {
1527       items.add(SchedulerItem.updateStart(startTime.get()));
1528     }
1529     if (endTime.isPresent()) {
1530       items.add(SchedulerItem.updateEnd(endTime.get()));
1531     }
1532     if (agentId.isPresent()) {
1533       items.add(SchedulerItem.updateAgent(agentId.get()));
1534     }
1535     if (properties.isPresent()) {
1536       items.add(SchedulerItem.updateProperties(properties.get()));
1537     }
1538 
1539     if (!items.isEmpty()) {
1540       sendSchedulerUpdate(new SchedulerItemList(mpId, items));
1541     }
1542   }
1543 
1544   private Map<String, String> getFinalAgentProperties(Map<String, String> caMetadata, Map<String, String> wfProperties,
1545           String captureAgentId, Optional<String> seriesId, Optional<DublinCoreCatalog> dublinCore) {
1546     Map<String, String> properties = new HashMap<>();
1547     for (Entry<String, String> entry : caMetadata.entrySet()) {
1548       if (entry.getKey().startsWith(WORKFLOW_CONFIG_PREFIX)) {
1549         continue;
1550       }
1551       properties.put(entry.getKey(), entry.getValue());
1552     }
1553     for (Entry<String, String> entry : wfProperties.entrySet()) {
1554       properties.put(WORKFLOW_CONFIG_PREFIX.concat(entry.getKey()), entry.getValue());
1555     }
1556     if (dublinCore.isPresent()) {
1557       properties.put("event.title", dublinCore.get().getFirst(DublinCore.PROPERTY_TITLE));
1558     }
1559     if (seriesId.isPresent()) {
1560       properties.put("event.series", seriesId.get());
1561     }
1562     properties.put("event.location", captureAgentId);
1563     return properties;
1564   }
1565 
1566   private void touchLastEntry(String captureAgentId) throws SchedulerException {
1567     // touch last entry
1568     try {
1569       logger.debug("Marking calendar feed for {} as modified", captureAgentId);
1570       persistence.touchLastEntry(captureAgentId);
1571       populateLastModifiedCache();
1572     } catch (SchedulerServiceDatabaseException e) {
1573       logger.error("Failed to update last modified entry of agent '{}':", captureAgentId, e);
1574     }
1575   }
1576 
1577   private void populateLastModifiedCache() throws SchedulerException {
1578     try {
1579       Map<String, Date> lastModifiedDates = persistence.getLastModifiedDates();
1580       for (Entry<String, Date> entry : lastModifiedDates.entrySet()) {
1581         Date lastModifiedDate = entry.getValue() != null ? entry.getValue() : new Date();
1582         lastModifiedCache.put(entry.getKey(), generateLastModifiedHash(lastModifiedDate));
1583       }
1584     } catch (Exception e) {
1585       throw new SchedulerException(e);
1586     }
1587   }
1588 
1589   private String generateLastModifiedHash(Date lastModifiedDate) {
1590     return "mod" + Long.toString(lastModifiedDate.getTime());
1591   }
1592 
1593   private String toPropertyString(Map<String, String> properties) {
1594     StringBuilder wfPropertiesString = new StringBuilder();
1595     for (Map.Entry<String, String> entry : properties.entrySet()) {
1596       wfPropertiesString.append(entry.getKey() + "=" + entry.getValue() + "\n");
1597     }
1598     return wfPropertiesString.toString();
1599   }
1600 
1601   private MediaPackage getEventMediaPackage(final String mediaPackageId, boolean checkOwner) {
1602     AQueryBuilder query = assetManager.createQuery();
1603     var predicate = withOrganization(query)
1604             .and(query.mediaPackageId(mediaPackageId))
1605             .and(query.version().isLatest());
1606     if (checkOwner) {
1607       predicate = predicate.and(withOwner(query));
1608     }
1609 
1610     Optional<ARecord> record = query.select(query.snapshot()).where(predicate).run().getRecords().stream().findFirst();
1611     if (record.isEmpty()) {
1612       throw new RuntimeNotFoundException(new NotFoundException());
1613     }
1614 
1615     return record.get().getSnapshot().get().getMediaPackage();
1616   }
1617 
1618   private MediaPackage getEventMediaPackage(final String mediaPackageId) {
1619     return getEventMediaPackage(mediaPackageId, true);
1620   }
1621 
1622   /**
1623    *
1624    * @param mp
1625    *          the mediapackage to update
1626    * @param dc
1627    *          the dublincore metadata to use to update the mediapackage
1628    * @return the updated mediapackage
1629    * @throws IOException
1630    *           Thrown if an IO error occurred adding the dc catalog file
1631    * @throws MediaPackageException
1632    *           Thrown if an error occurred updating the mediapackage or the mediapackage does not contain a catalog
1633    */
1634   private MediaPackage updateDublincCoreCatalog(MediaPackage mp, DublinCoreCatalog dc)
1635           throws IOException, MediaPackageException {
1636     try (InputStream inputStream = IOUtils.toInputStream(dc.toXmlString(), "UTF-8")) {
1637       // Update dublincore catalog
1638       Catalog[] catalogs = mp.getCatalogs(MediaPackageElements.EPISODE);
1639       if (catalogs.length > 0) {
1640         Catalog catalog = catalogs[0];
1641         URI uri = workspace.put(mp.getIdentifier().toString(), catalog.getIdentifier(), "dublincore.xml", inputStream);
1642         catalog.setURI(uri);
1643         // setting the URI to a new source so the checksum will most like be invalid
1644         catalog.setChecksum(null);
1645       } else {
1646         throw new MediaPackageException("Unable to find catalog");
1647       }
1648     }
1649     return mp;
1650   }
1651 
1652   private TechnicalMetadata getTechnicalMetadata(ExtendedEventDto extEvt) {
1653     final String agentId = extEvt.getCaptureAgentId();
1654     final Date start = extEvt.getStartDate();
1655     final Date end = extEvt.getEndDate();
1656     final Set<String> presenters = getPresenters(Optional.ofNullable(extEvt.getPresenters()).orElse(""));
1657     final Optional<String> recordingStatus = Optional.ofNullable(extEvt.getRecordingState());
1658     final Optional<Long> lastHeard = Optional.ofNullable(extEvt.getRecordingLastHeard());
1659     final Map<String, String> caMetadata = deserializeExtendedEventProperties(extEvt.getCaptureAgentProperties());
1660     final Map<String, String> wfProperties = deserializeExtendedEventProperties(extEvt.getWorkflowProperties());
1661 
1662     Recording recording = null;
1663     if (recordingStatus.isPresent() && lastHeard.isPresent()) {
1664       recording = new RecordingImpl(extEvt.getMediaPackageId(), recordingStatus.get(), lastHeard.get());
1665     }
1666 
1667     return new TechnicalMetadataImpl(extEvt.getMediaPackageId(), agentId, start, end, presenters, wfProperties,
1668             caMetadata, Optional.ofNullable(recording));
1669   }
1670 
1671   private Predicate withOrganization(AQueryBuilder query) {
1672     return query.organizationId().eq(securityService.getOrganization().getId());
1673   }
1674 
1675   private Predicate withOwner(AQueryBuilder query) {
1676     return query.owner().eq(SNAPSHOT_OWNER);
1677   }
1678 
1679   private Set<String> getPresenters(String presentersString) {
1680     return new HashSet<>(Arrays.asList(StringUtils.split(presentersString, ",")));
1681   }
1682 
1683   /**
1684    * @return A {@link List} of {@link MediaPackageElementFlavor} that provide the extended metadata to the front end.
1685    */
1686   private List<MediaPackageElementFlavor> getEventCatalogUIAdapterFlavors() {
1687     String organization = securityService.getOrganization().getId();
1688     return eventCatalogUIAdapters.stream()
1689         .filter(adapter -> adapter.getOrganization().equals(organization))
1690         .map(EventCatalogUIAdapter::getFlavor)
1691         .filter(mpe -> !MediaPackageElements.EPISODE.matches(mpe))
1692         .collect(Collectors.toList());
1693   }
1694 
1695   @Override
1696   public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
1697     try {
1698       final int total;
1699       try {
1700         total = persistence.countEvents();
1701       } catch (SchedulerServiceDatabaseException e) {
1702         logIndexRebuildError(logger, e);
1703         throw new IndexRebuildException(getService(), e);
1704       }
1705       logIndexRebuildBegin(logger, total, "scheduled events");
1706       final int[] current = {0};
1707       int n = 20;
1708       var updatedEventRange = new ArrayList<Event>();
1709 
1710       for (Organization organization: orgDirectoryService.getOrganizations()) {
1711         final User user = SecurityUtil.createSystemUser(systemUserName, organization);
1712         SecurityUtil.runAs(securityService, organization, user,
1713                 () -> {
1714                   final List<ExtendedEventDto> events;
1715                   try {
1716                     events = persistence.getEvents();
1717                   } catch (SchedulerServiceDatabaseException e) {
1718                     logIndexRebuildError(logger, e, organization);
1719                     return;
1720                   }
1721 
1722                   for (ExtendedEventDto event : events) {
1723                     try {
1724                       current[0]++;
1725 
1726                       var updatedEventData = Optional.of(new Event(event.getMediaPackageId(), organization.getId()));
1727 
1728                       final Set<String> presenters = getPresenters(
1729                               Optional.ofNullable(event.getPresenters()).orElse(""));
1730                       final Map<String, String> caMetadata = deserializeExtendedEventProperties(
1731                               event.getCaptureAgentProperties());
1732 
1733                       updatedEventData = getEventUpdateFunction(event.getMediaPackageId(), Optional.empty(),
1734                               Optional.empty(), Optional.of(event.getStartDate()), Optional.of(event.getEndDate()),
1735                               Optional.of(presenters), Optional.of(event.getCaptureAgentId()), Optional.of(caMetadata),
1736                               Optional.ofNullable(event.getRecordingState()), organization.getId(),
1737                               securityService.getUser()).apply(updatedEventData);
1738                       updatedEventRange.add(updatedEventData.get());
1739 
1740                       if (updatedEventRange.size() >= n || current[0] >= events.size()) {
1741                         index.bulkEventUpdate(updatedEventRange);
1742                         logIndexRebuildProgress(logger, total, current[0], n);
1743                         updatedEventRange.clear();
1744                       }
1745 
1746                     } catch (SearchIndexException e) {
1747                       logger.error("Error while updating event '{}' from search index:", event.getMediaPackageId(), e);
1748                     } catch (Exception e) {
1749                       throw new RuntimeException("Fatal error while indexing event " + event.getMediaPackageId(), e);
1750                     }
1751                   }
1752                 });
1753       }
1754     } catch (Exception e) {
1755       logIndexRebuildError(logger, e);
1756       throw new IndexRebuildException(getService(), e);
1757     }
1758   }
1759 
1760   @Override
1761   public IndexRebuildService.Service getService() {
1762     return IndexRebuildService.Service.Scheduler;
1763   }
1764 
1765   public SecurityService getSecurityService() {
1766     return securityService;
1767   }
1768 
1769   /**
1770    * Get the function to update a scheduled event in the Elasticsearch index.
1771    *
1772    * @param orgId          The id of the current organization
1773    * @param user           The user
1774    * @return the function to do the update
1775    */
1776   private Function<Optional<Event>, Optional<Event>> getEventUpdateFunction(String mediaPackageId,
1777           Optional<AccessControlList> acl, Optional<DublinCoreCatalog> dublinCore, Optional<Date> startTime,
1778           Optional<Date> endTime, Optional<Set<String>> presenters, Optional<String> agentId,
1779           Optional<Map<String, String>> properties, Optional<String> recordingStatus, String orgId, User user) {
1780     return (Optional<Event> eventOpt) -> {
1781       Event event = eventOpt.orElse(new Event(mediaPackageId, orgId));
1782 
1783       if (acl.isPresent()) {
1784         event.setAccessPolicy(AccessControlParser.toJsonSilent(acl.get()));
1785       }
1786       if (dublinCore.isPresent()) {
1787         EventIndexUtils.updateEvent(event, dublinCore.get());
1788         if (isBlank(event.getCreator())) {
1789           event.setCreator(getSecurityService().getUser().getName());
1790         }
1791 
1792         // Update series name if not already done
1793         try {
1794           EventIndexUtils.updateSeriesName(event, orgId, user, index);
1795         } catch (SearchIndexException e) {
1796           logger.error("Error updating the series name of the event {} in the {} index.", mediaPackageId,
1797                   index.getIndexName(), e);
1798         }
1799       }
1800       if (presenters.isPresent()) {
1801         event.setTechnicalPresenters(new ArrayList<>(presenters.get()));
1802       }
1803       if (agentId.isPresent()) {
1804         event.setAgentId(agentId.get());
1805       }
1806       if (recordingStatus.isPresent() && !recordingStatus.get().equals(RecordingState.UNKNOWN)) {
1807         event.setRecordingStatus(recordingStatus.get());
1808       }
1809       if (properties.isPresent()) {
1810         event.setAgentConfiguration(properties.get());
1811       }
1812       if (startTime.isPresent()) {
1813         String startTimeStr = startTime == null ? null : DateTimeSupport.toUTC(startTime.get().getTime());
1814         event.setTechnicalStartTime(startTimeStr);
1815       }
1816       if (endTime.isPresent()) {
1817         String endTimeStr = endTime == null ? null : DateTimeSupport.toUTC(endTime.get().getTime());
1818         event.setTechnicalEndTime(endTimeStr);
1819       }
1820 
1821       return Optional.of(event);
1822     };
1823   }
1824 }