1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.scheduler.impl;
22
23 import static org.apache.commons.lang3.StringUtils.isBlank;
24 import static org.opencastproject.scheduler.impl.SchedulerUtil.calculateChecksum;
25 import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
26 import static org.opencastproject.util.EqualsUtil.ne;
27 import static org.opencastproject.util.RequireUtil.notEmpty;
28 import static org.opencastproject.util.RequireUtil.notNull;
29 import static org.opencastproject.util.RequireUtil.requireTrue;
30
31 import org.opencastproject.assetmanager.api.Asset;
32 import org.opencastproject.assetmanager.api.AssetManager;
33 import org.opencastproject.assetmanager.api.Availability;
34 import org.opencastproject.assetmanager.api.Snapshot;
35 import org.opencastproject.elasticsearch.api.SearchIndexException;
36 import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
37 import org.opencastproject.elasticsearch.index.objects.event.Event;
38 import org.opencastproject.elasticsearch.index.objects.event.EventIndexUtils;
39 import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
40 import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
41 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
42 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
43 import org.opencastproject.mediapackage.Catalog;
44 import org.opencastproject.mediapackage.MediaPackage;
45 import org.opencastproject.mediapackage.MediaPackageElement;
46 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
47 import org.opencastproject.mediapackage.MediaPackageElements;
48 import org.opencastproject.mediapackage.MediaPackageException;
49 import org.opencastproject.mediapackage.MediaPackageSupport;
50 import org.opencastproject.mediapackage.identifier.Id;
51 import org.opencastproject.mediapackage.identifier.IdImpl;
52 import org.opencastproject.message.broker.api.scheduler.SchedulerItem;
53 import org.opencastproject.message.broker.api.scheduler.SchedulerItemList;
54 import org.opencastproject.message.broker.api.update.SchedulerUpdateHandler;
55 import org.opencastproject.metadata.dublincore.DCMIPeriod;
56 import org.opencastproject.metadata.dublincore.DublinCore;
57 import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
58 import org.opencastproject.metadata.dublincore.DublinCoreUtil;
59 import org.opencastproject.metadata.dublincore.DublinCoreValue;
60 import org.opencastproject.metadata.dublincore.DublinCores;
61 import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
62 import org.opencastproject.metadata.dublincore.EventCatalogUIAdapter;
63 import org.opencastproject.metadata.dublincore.Precision;
64 import org.opencastproject.scheduler.api.Recording;
65 import org.opencastproject.scheduler.api.RecordingImpl;
66 import org.opencastproject.scheduler.api.RecordingState;
67 import org.opencastproject.scheduler.api.SchedulerConflictException;
68 import org.opencastproject.scheduler.api.SchedulerException;
69 import org.opencastproject.scheduler.api.SchedulerService;
70 import org.opencastproject.scheduler.api.TechnicalMetadata;
71 import org.opencastproject.scheduler.api.TechnicalMetadataImpl;
72 import org.opencastproject.scheduler.api.Util;
73 import org.opencastproject.scheduler.impl.persistence.ExtendedEventDto;
74 import org.opencastproject.security.api.AccessControlList;
75 import org.opencastproject.security.api.AccessControlParser;
76 import org.opencastproject.security.api.AccessControlUtil;
77 import org.opencastproject.security.api.AuthorizationService;
78 import org.opencastproject.security.api.Organization;
79 import org.opencastproject.security.api.OrganizationDirectoryService;
80 import org.opencastproject.security.api.SecurityService;
81 import org.opencastproject.security.api.UnauthorizedException;
82 import org.opencastproject.security.api.User;
83 import org.opencastproject.security.util.SecurityUtil;
84 import org.opencastproject.series.api.SeriesService;
85 import org.opencastproject.util.DateTimeSupport;
86 import org.opencastproject.util.NotFoundException;
87 import org.opencastproject.util.OsgiUtil;
88 import org.opencastproject.util.XmlNamespaceBinding;
89 import org.opencastproject.util.XmlNamespaceContext;
90 import org.opencastproject.util.data.functions.Misc;
91 import org.opencastproject.workspace.api.Workspace;
92
93 import com.google.common.cache.Cache;
94 import com.google.common.cache.CacheBuilder;
95 import com.google.gson.Gson;
96 import com.google.gson.reflect.TypeToken;
97
98 import net.fortuna.ical4j.model.Period;
99 import net.fortuna.ical4j.model.TimeZoneRegistry;
100 import net.fortuna.ical4j.model.TimeZoneRegistryFactory;
101 import net.fortuna.ical4j.model.property.RRule;
102
103 import org.apache.commons.io.IOUtils;
104 import org.apache.commons.lang3.StringUtils;
105 import org.joda.time.DateTime;
106 import org.joda.time.DateTimeZone;
107 import org.osgi.service.cm.ConfigurationException;
108 import org.osgi.service.cm.ManagedService;
109 import org.osgi.service.component.ComponentContext;
110 import org.osgi.service.component.annotations.Activate;
111 import org.osgi.service.component.annotations.Component;
112 import org.osgi.service.component.annotations.Reference;
113 import org.osgi.service.component.annotations.ReferenceCardinality;
114 import org.osgi.service.component.annotations.ReferencePolicy;
115 import org.osgi.service.component.annotations.ReferencePolicyOption;
116 import org.slf4j.Logger;
117 import org.slf4j.LoggerFactory;
118
119 import java.io.IOException;
120 import java.io.InputStream;
121 import java.lang.reflect.Type;
122 import java.net.URI;
123 import java.util.ArrayList;
124 import java.util.Arrays;
125 import java.util.Calendar;
126 import java.util.Collections;
127 import java.util.Comparator;
128 import java.util.Date;
129 import java.util.Dictionary;
130 import java.util.HashMap;
131 import java.util.HashSet;
132 import java.util.LinkedList;
133 import java.util.List;
134 import java.util.Map;
135 import java.util.Map.Entry;
136 import java.util.Optional;
137 import java.util.Set;
138 import java.util.TimeZone;
139 import java.util.UUID;
140 import java.util.concurrent.ConcurrentHashMap;
141 import java.util.concurrent.CopyOnWriteArrayList;
142 import java.util.concurrent.TimeUnit;
143 import java.util.function.Function;
144 import java.util.stream.Collectors;
145
146
147
148
149 @Component(
150 service = { ManagedService.class, SchedulerService.class, IndexProducer.class },
151 property = {
152 "service.description=Scheduler Service"
153 }
154 )
155 public class SchedulerServiceImpl extends AbstractIndexProducer implements SchedulerService, ManagedService {
156
157
158 private static final Logger logger = LoggerFactory.getLogger(SchedulerServiceImpl.class);
159
160
161 private static final String CFG_KEY_LAST_MODIFIED_CACHE_EXPIRE = "last_modified_cache_expire";
162
163
164 private static final String CFG_KEY_MAINTENANCE = "maintenance";
165
166
167 private static final int DEFAULT_CACHE_EXPIRE = 60;
168
169
170 private static final String EMPTY_CALENDAR_ETAG = "mod0";
171
172 private static final String SNAPSHOT_OWNER = SchedulerService.JOB_TYPE;
173
174 private static final Gson gson = new Gson();
175
176
177
178
179
180 private static Map<String,String> deserializeExtendedEventProperties(String props) {
181 if (props == null || props.trim().isEmpty()) {
182 return new HashMap<>();
183 }
184 Type type = new TypeToken<Map<String, String>>() { }.getType();
185 return gson.fromJson(props, type);
186 }
187
188
189 protected Cache<String, String> lastModifiedCache = CacheBuilder.newBuilder()
190 .expireAfterWrite(DEFAULT_CACHE_EXPIRE, TimeUnit.SECONDS).build();
191
192
193 private SchedulerServiceDatabase persistence;
194
195
196 private SeriesService seriesService;
197
198
199 private SecurityService securityService;
200
201
202 private AssetManager assetManager;
203
204
205 private Workspace workspace;
206
207
208 private AuthorizationService authorizationService;
209
210
211 private OrganizationDirectoryService orgDirectoryService;
212
213
214 private ElasticsearchIndex index;
215
216
217 private List<EventCatalogUIAdapter> eventCatalogUIAdapters = new ArrayList<>();
218 private final List<SchedulerUpdateHandler> schedulerUpdateHandlers = new CopyOnWriteArrayList<>();
219
220
221 private String systemUserName;
222
223 private ComponentContext componentContext;
224
225
226
227
228
229
230 @Reference(
231 cardinality = ReferenceCardinality.MULTIPLE,
232 policy = ReferencePolicy.DYNAMIC,
233 policyOption = ReferencePolicyOption.GREEDY,
234 unbind = "removeSchedulerUpdateHandler"
235 )
236 public void addSchedulerUpdateHandler(SchedulerUpdateHandler handler) {
237 this.schedulerUpdateHandlers.add(handler);
238 }
239
240 public void removeSchedulerUpdateHandler(SchedulerUpdateHandler handler) {
241 this.schedulerUpdateHandlers.remove(handler);
242 }
243
244
245
246
247
248
249 @Reference
250 public void setPersistence(SchedulerServiceDatabase persistence) {
251 this.persistence = persistence;
252 }
253
254
255
256
257
258
259 @Reference
260 public void setSeriesService(SeriesService seriesService) {
261 this.seriesService = seriesService;
262 }
263
264
265
266
267
268
269 @Reference
270 public void setSecurityService(SecurityService securityService) {
271 this.securityService = securityService;
272 }
273
274
275
276
277
278
279 @Reference
280 public void setAssetManager(AssetManager assetManager) {
281 this.assetManager = assetManager;
282 }
283
284
285
286
287
288
289 @Reference
290 public void setWorkspace(Workspace workspace) {
291 this.workspace = workspace;
292 }
293
294
295
296
297
298
299 @Reference
300 public void setAuthorizationService(AuthorizationService authorizationService) {
301 this.authorizationService = authorizationService;
302 }
303
304
305
306
307
308
309 private void sendSchedulerUpdate(SchedulerItemList list) {
310 while (schedulerUpdateHandlers.size() != 1) {
311 logger.warn("Expecting 1 handler, but {} are registered. Waiting 10s then retrying...", schedulerUpdateHandlers.size());
312 try {
313 Thread.sleep(10000L);
314 } catch (InterruptedException e) { }
315 }
316 String mpid = list.getId();
317 for (SchedulerItem item : list.getItems()) {
318 for (SchedulerUpdateHandler handler : this.schedulerUpdateHandlers) {
319 handler.execute(mpid, item);
320 }
321 }
322 }
323
324
325
326
327
328
329 @Reference
330 public void setOrgDirectoryService(OrganizationDirectoryService orgDirectoryService) {
331 this.orgDirectoryService = orgDirectoryService;
332 }
333
334
335
336
337
338
339
340 @Reference
341 public void setIndex(ElasticsearchIndex index) {
342 this.index = index;
343 }
344
345
346 @Reference(
347 cardinality = ReferenceCardinality.MULTIPLE,
348 policy = ReferencePolicy.DYNAMIC,
349 policyOption = ReferencePolicyOption.GREEDY,
350 unbind = "removeCatalogUIAdapter"
351 )
352 public void addCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
353 eventCatalogUIAdapters.add(catalogUIAdapter);
354 }
355
356
357 public void removeCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
358 eventCatalogUIAdapters.remove(catalogUIAdapter);
359 }
360
361
362
363
364
365
366
367
368 @Activate
369 public void activate(ComponentContext cc) throws Exception {
370 this.componentContext = cc;
371 systemUserName = SecurityUtil.getSystemUserName(cc);
372 logger.info("Activating Scheduler Service");
373 }
374
375 @Override
376 public void updated(Dictionary<String, ?> properties) throws ConfigurationException {
377 if (properties != null) {
378 final Optional<Integer> cacheExpireDuration = OsgiUtil.getOptCfgAsInt(properties, CFG_KEY_LAST_MODIFIED_CACHE_EXPIRE);
379 if (cacheExpireDuration.isPresent()) {
380 lastModifiedCache = CacheBuilder.newBuilder().expireAfterWrite(cacheExpireDuration.get(), TimeUnit.SECONDS)
381 .build();
382 logger.info("Set last modified cache to {}", DateTimeSupport.humanReadableTime(cacheExpireDuration.get()));
383 } else {
384 logger.info("Set last modified cache to default {}", DateTimeSupport.humanReadableTime(DEFAULT_CACHE_EXPIRE));
385 }
386 final Optional<Boolean> maintenance = OsgiUtil.getOptCfgAsBoolean(properties, CFG_KEY_MAINTENANCE);
387 if (maintenance.orElse(false)) {
388 final String name = SchedulerServiceImpl.class.getName();
389 logger.warn("Putting scheduler into maintenance mode. This only makes sense when migrating data. If this is not"
390 + " intended, edit the config file '{}.cfg' accordingly and restart opencast.", name);
391 componentContext.disableComponent(name);
392 }
393 }
394 }
395
396 @Override
397 public void addEvent(Date startDateTime, Date endDateTime, String captureAgentId, Set<String> userIds,
398 MediaPackage mediaPackage, Map<String, String> wfProperties, Map<String, String> caMetadata,
399 Optional<String> schedulingSource)
400 throws UnauthorizedException, SchedulerException {
401 addEventInternal(startDateTime, endDateTime, captureAgentId, userIds, mediaPackage, wfProperties, caMetadata,
402 schedulingSource);
403 }
404
405 private void addEventInternal(Date startDateTime, Date endDateTime, String captureAgentId, Set<String> userIds,
406 MediaPackage mediaPackage, Map<String, String> wfProperties, Map<String, String> caMetadata,
407 Optional<String> schedulingSource)
408 throws SchedulerException {
409 notNull(startDateTime, "startDateTime");
410 notNull(endDateTime, "endDateTime");
411 notEmpty(captureAgentId, "captureAgentId");
412 notNull(userIds, "userIds");
413 notNull(mediaPackage, "mediaPackage");
414 notNull(wfProperties, "wfProperties");
415 notNull(caMetadata, "caMetadata");
416 notNull(schedulingSource, "schedulingSource");
417 if (endDateTime.before(startDateTime))
418 throw new IllegalArgumentException("The end date is before the start date");
419
420 final String mediaPackageId = mediaPackage.getIdentifier().toString();
421
422 try {
423 Optional<MediaPackage> noMediaPackage = assetManager.getMediaPackage(mediaPackageId);
424 if (noMediaPackage.isPresent()) {
425 logger.warn("Mediapackage with id '{}' already exists!", mediaPackageId);
426 throw new SchedulerConflictException("Mediapackage with id '" + mediaPackageId + "' already exists!");
427 }
428
429 Optional<String> seriesId = Optional.ofNullable(StringUtils.trimToNull(mediaPackage.getSeries()));
430
431 List<MediaPackage> conflictingEvents = findConflictingEvents(captureAgentId, startDateTime, endDateTime);
432 if (conflictingEvents.size() > 0) {
433 logger.info("Unable to add event {}, conflicting events found: {}", mediaPackageId, conflictingEvents);
434 throw new SchedulerConflictException(
435 "Unable to add event, conflicting events found for event " + mediaPackageId);
436 }
437
438
439 Optional<DublinCoreCatalog> dublinCore = DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage);
440 AccessControlList acl = authorizationService.getActiveAcl(mediaPackage).getA();
441
442
443 Map<String, String> finalCaProperties = getFinalAgentProperties(caMetadata, wfProperties, captureAgentId,
444 seriesId, dublinCore);
445
446
447 String checksum = calculateChecksum(workspace, getEventCatalogUIAdapterFlavors(), startDateTime, endDateTime,
448 captureAgentId, userIds, mediaPackage, dublinCore, wfProperties,
449 finalCaProperties, acl);
450 persistEvent(mediaPackageId, checksum, Optional.of(startDateTime), Optional.of(endDateTime),
451 Optional.of(captureAgentId), Optional.of(userIds), Optional.of(mediaPackage), Optional.of(wfProperties),
452 Optional.of(finalCaProperties), schedulingSource);
453
454
455 updateLiveEvent(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime),
456 Optional.of(endDateTime), Optional.of(captureAgentId), Optional.of(finalCaProperties));
457
458
459 updateEventInIndex(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime), Optional.of(endDateTime),
460 Optional.of(userIds), Optional.of(captureAgentId), Optional.of(finalCaProperties), Optional.empty());
461
462
463 touchLastEntry(captureAgentId);
464 } catch (SchedulerException e) {
465 throw e;
466 } catch (Exception e) {
467 logger.error("Failed to create event with id '{}':", mediaPackageId, e);
468 throw new SchedulerException(e);
469 }
470 }
471
472 @Override
473 public Map<String, Period> addMultipleEvents(RRule rRule, Date start, Date end, Long duration, TimeZone tz,
474 String captureAgentId, Set<String> userIds, MediaPackage templateMp, Map<String, String> wfProperties,
475 Map<String, String> caMetadata, Optional<String> schedulingSource)
476 throws UnauthorizedException, SchedulerConflictException, SchedulerException {
477
478 Util.adjustRrule(rRule, start, tz);
479 List<Period> periods = Util.calculatePeriods(start, end, duration, rRule, tz);
480 if (periods.isEmpty()) {
481 return Collections.emptyMap();
482 }
483 return addMultipleEventInternal(periods, captureAgentId, userIds, templateMp, wfProperties, caMetadata,
484 schedulingSource);
485 }
486
487 private Map<String, Period> addMultipleEventInternal(List<Period> periods, String captureAgentId,
488 Set<String> userIds, MediaPackage templateMp, Map<String, String> wfProperties,
489 Map<String, String> caMetadata, Optional<String> schedulingSource) throws SchedulerException {
490 notNull(periods, "periods");
491 requireTrue(periods.size() > 0, "periods");
492 notEmpty(captureAgentId, "captureAgentId");
493 notNull(userIds, "userIds");
494 notNull(templateMp, "mediaPackages");
495 notNull(wfProperties, "wfProperties");
496 notNull(caMetadata, "caMetadata");
497 notNull(schedulingSource, "schedulingSource");
498
499 Map<String, Period> scheduledEvents = new ConcurrentHashMap<>();
500
501 try {
502 LinkedList<Id> ids = new LinkedList<>();
503
504 while (ids.size() <= periods.size()) {
505
506 while (ids.size() <= periods.size()) {
507 Id id = new IdImpl(UUID.randomUUID().toString());
508 ids.add(id);
509 }
510
511 List<Snapshot> snapshots = assetManager.getLatestSnapshots(ids);
512
513
514 if (snapshots.size() > 0) {
515 ids.clear();
516 }
517 }
518
519 Optional<String> seriesId = Optional.ofNullable(StringUtils.trimToNull(templateMp.getSeries()));
520
521 List<MediaPackage> conflictingEvents = findConflictingEvents(periods, captureAgentId, TimeZone.getDefault());
522 if (conflictingEvents.size() > 0) {
523 logger.info("Unable to add events, conflicting events found: {}", conflictingEvents);
524 throw new SchedulerConflictException("Unable to add event, conflicting events found");
525 }
526
527 final Organization org = securityService.getOrganization();
528 final User user = securityService.getUser();
529 periods.parallelStream().forEach(event -> SecurityUtil.runAs(securityService, org, user, () -> {
530 final int currentCounter = periods.indexOf(event);
531 MediaPackage mediaPackage = (MediaPackage) templateMp.clone();
532 Date startDate = new Date(event.getStart().getTime());
533 Date endDate = new Date(event.getEnd().getTime());
534 Id id = ids.get(currentCounter);
535
536
537 DublinCoreCatalog dc;
538 Optional<DublinCoreCatalog> dcOpt = DublinCoreUtil.loadEpisodeDublinCore(workspace, templateMp);
539 if (dcOpt.isPresent()) {
540 dc = dcOpt.get();
541 dc = (DublinCoreCatalog) dc.clone();
542
543 dc.addBindings(XmlNamespaceContext
544 .mk(XmlNamespaceBinding.mk(DublinCores.OC_PROPERTY_NS_PREFIX, DublinCores.OC_PROPERTY_NS_URI)));
545 } else {
546 dc = DublinCores.mkOpencastEpisode().getCatalog();
547 }
548
549
550 mediaPackage.setIdentifier(id);
551
552
553 String newTitle = dc.getFirst(DublinCore.PROPERTY_TITLE) + String.format(" %0" + Integer.toString(periods.size()).length() + "d", currentCounter + 1);
554 dc.set(DublinCore.PROPERTY_TITLE, newTitle);
555 DublinCoreValue eventTime = EncodingSchemeUtils.encodePeriod(new DCMIPeriod(startDate, endDate),
556 Precision.Second);
557 dc.set(DublinCore.PROPERTY_TEMPORAL, eventTime);
558 dc.set(DublinCore.PROPERTY_CREATED, EncodingSchemeUtils.encodeDate(startDate, Precision.Second));
559 try {
560 mediaPackage = updateDublincCoreCatalog(mediaPackage, dc);
561 } catch (Exception e) {
562 Misc.chuck(e);
563 }
564 mediaPackage.setTitle(newTitle);
565
566 String mediaPackageId = mediaPackage.getIdentifier().toString();
567
568 Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
569 cal.setTime(event.getStart());
570 Date startDateTime = cal.getTime();
571 cal.setTime(event.getEnd());
572 Date endDateTime = cal.getTime();
573
574 Optional<DublinCoreCatalog> dublinCore = DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage);
575 AccessControlList acl = authorizationService.getActiveAcl(mediaPackage).getA();
576
577
578 Map<String, String> finalCaProperties = getFinalAgentProperties(caMetadata, wfProperties, captureAgentId,
579 seriesId, dublinCore);
580
581
582 String checksum = calculateChecksum(workspace, getEventCatalogUIAdapterFlavors(), startDateTime, endDateTime,
583 captureAgentId, userIds, mediaPackage, dublinCore, wfProperties, finalCaProperties, acl);
584 try {
585 persistEvent(mediaPackageId, checksum, Optional.of(startDateTime), Optional.of(endDateTime),
586 Optional.of(captureAgentId), Optional.of(userIds), Optional.of(mediaPackage), Optional.of(wfProperties),
587 Optional.of(finalCaProperties), schedulingSource);
588 } catch (Exception e) {
589 Misc.chuck(e);
590 }
591
592
593 updateLiveEvent(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime), Optional.of(endDateTime),
594 Optional.of(captureAgentId), Optional.of(finalCaProperties));
595
596
597 updateEventInIndex(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime), Optional.of(endDateTime),
598 Optional.of(userIds), Optional.of(captureAgentId), Optional.of(finalCaProperties), Optional.empty());
599
600 scheduledEvents.put(mediaPackageId, event);
601 for (MediaPackageElement mediaPackageElement : mediaPackage.getElements()) {
602 try {
603 workspace.delete(mediaPackage.getIdentifier().toString(), mediaPackageElement.getIdentifier());
604 } catch (NotFoundException | IOException e) {
605 logger.warn("Failed to delete media package element", e);
606 }
607 }
608 }));
609 return scheduledEvents;
610 } catch (SchedulerException e) {
611 throw e;
612 } catch (Exception e) {
613 throw new SchedulerException(e);
614 } finally {
615
616 if (!scheduledEvents.isEmpty()) {
617 touchLastEntry(captureAgentId);
618 }
619 }
620 }
621
622 @Override
623 public void updateEvent(final String mpId, Optional<Date> startDateTime, Optional<Date> endDateTime, Optional<String> captureAgentId,
624 Optional<Set<String>> userIds, Optional<MediaPackage> mediaPackage, Optional<Map<String, String>> wfProperties,
625 Optional<Map<String, String>> caMetadata)
626 throws NotFoundException, UnauthorizedException, SchedulerException {
627 updateEventInternal(mpId, startDateTime, endDateTime, captureAgentId, userIds, mediaPackage,
628 wfProperties, caMetadata, false);
629 }
630
631 @Override
632 public void updateEvent(final String mpId, Optional<Date> startDateTime, Optional<Date> endDateTime, Optional<String> captureAgentId,
633 Optional<Set<String>> userIds, Optional<MediaPackage> mediaPackage, Optional<Map<String, String>> wfProperties,
634 Optional<Map<String, String>> caMetadata, boolean allowConflict)
635 throws NotFoundException, UnauthorizedException, SchedulerException {
636 updateEventInternal(mpId, startDateTime, endDateTime, captureAgentId, userIds, mediaPackage,
637 wfProperties, caMetadata, allowConflict);
638 }
639
640 private void updateEventInternal(final String mpId, Optional<Date> startDateTime,
641 Optional<Date> endDateTime, Optional<String> captureAgentId, Optional<Set<String>> userIds,
642 Optional<MediaPackage> mediaPackageOpt, Optional<Map<String, String>> wfProperties, Optional<Map<String, String>> caMetadata,
643 boolean allowConflict) throws NotFoundException, SchedulerException {
644 notEmpty(mpId, "mpId");
645 notNull(startDateTime, "startDateTime");
646 notNull(endDateTime, "endDateTime");
647 notNull(captureAgentId, "captureAgentId");
648 notNull(userIds, "userIds");
649 notNull(mediaPackageOpt, "mediaPackageOpt");
650 notNull(wfProperties, "wfProperties");
651 notNull(caMetadata, "caMetadata");
652
653 try {
654 Optional<Snapshot> optSnapshot = assetManager.getLatestSnapshot(mpId);
655 Optional<ExtendedEventDto> optExtEvent = persistence.getEvent(mpId);
656 if (optSnapshot.isEmpty() || optExtEvent.isEmpty())
657 throw new NotFoundException("No event found while updating event " + mpId);
658
659 Snapshot snapshot = optSnapshot.get();
660 MediaPackage archivedMediaPackage = snapshot.getMediaPackage();
661
662 Optional<DublinCoreCatalog> archivedDublinCoreOpt = loadEpisodeDublinCoreFromAsset(snapshot);
663 if (archivedDublinCoreOpt.isEmpty())
664 throw new NotFoundException("No dublincore found while updating event " + mpId);
665 DublinCoreCatalog archivedDublinCore = archivedDublinCoreOpt.get();
666 AccessControlList archivedAcl = authorizationService.getActiveAcl(archivedMediaPackage).getA();
667
668 final ExtendedEventDto extendedEventDto = optExtEvent.get();
669 Date start = extendedEventDto.getStartDate();
670 Date end = extendedEventDto.getEndDate();
671
672 if ((startDateTime.isPresent() || endDateTime.isPresent()) && endDateTime.orElse(end).before(startDateTime.orElse(start)))
673 throw new SchedulerException("The end date is before the start date");
674
675 String agentId = extendedEventDto.getCaptureAgentId();
676 Optional<String> seriesId = Optional.ofNullable(archivedMediaPackage.getSeries());
677
678
679
680 if ((captureAgentId.isPresent() || startDateTime.isPresent() || endDateTime.isPresent())
681 && (!allowConflict || !isAdmin())) {
682 List<MediaPackage> conflictingEvents = findConflictingEvents(
683 captureAgentId.orElse(agentId),
684 startDateTime.orElse(start),
685 endDateTime.orElse(end)
686 ).stream()
687 .filter(mp -> !mpId.equals(mp.getIdentifier().toString()))
688 .collect(Collectors.toList());
689 if (conflictingEvents.size() > 0) {
690 logger.info("Unable to update event {}, conflicting events found: {}", mpId, conflictingEvents);
691 throw new SchedulerConflictException("Unable to update event, conflicting events found for event " + mpId);
692 }
693 }
694
695 Set<String> presenters = getPresenters(Optional.ofNullable(extendedEventDto.getPresenters()).orElse(""));
696 Map<String, String> wfProps = deserializeExtendedEventProperties(extendedEventDto.getWorkflowProperties());
697 Map<String, String> caProperties = deserializeExtendedEventProperties(
698 extendedEventDto.getCaptureAgentProperties());
699
700 boolean propertiesChanged = false;
701 boolean dublinCoreChanged = false;
702
703
704 if (wfProperties.isPresent()) {
705 propertiesChanged = true;
706 wfProps = wfProperties.get();
707 }
708
709
710 if (caMetadata.isPresent()) {
711 propertiesChanged = true;
712 caProperties = caMetadata.get();
713 }
714
715 if (captureAgentId.isPresent())
716 propertiesChanged = true;
717
718 Optional<AccessControlList> changedAclOpt = Optional.empty();
719 Optional<DublinCoreCatalog> changedDublinCoreOpt = Optional.empty();
720 if (mediaPackageOpt.isPresent()) {
721 MediaPackage mediaPackage = mediaPackageOpt.get();
722
723 if (ne(archivedMediaPackage.getSeries(), mediaPackage.getSeries())) {
724 propertiesChanged = true;
725 seriesId = Optional.ofNullable(mediaPackage.getSeries());
726 }
727
728
729 AccessControlList acl = authorizationService.getActiveAcl(mediaPackage).getA();
730 if (!AccessControlUtil.equals(acl, archivedAcl)) {
731 changedAclOpt = Optional.of(acl);
732 }
733
734
735 Optional<DublinCoreCatalog> dublinCoreOpt = DublinCoreUtil.loadEpisodeDublinCore(workspace,
736 mediaPackage);
737 if (dublinCoreOpt.isPresent() && !DublinCoreUtil.equals(archivedDublinCore, dublinCoreOpt.get())) {
738 dublinCoreChanged = true;
739 propertiesChanged = true;
740 changedDublinCoreOpt = dublinCoreOpt;
741 }
742 }
743
744
745 DublinCoreCatalog dublinCore = changedDublinCoreOpt.orElse(archivedDublinCore);
746 DublinCoreCatalog dublinCoreCopy = (DublinCoreCatalog) dublinCore.clone();
747 if (startDateTime.isPresent() && endDateTime.isPresent()) {
748 DublinCoreValue eventTime = EncodingSchemeUtils.encodePeriod(
749 new DCMIPeriod(startDateTime.get(), endDateTime.get()), Precision.Second);
750 dublinCore.set(DublinCore.PROPERTY_TEMPORAL, eventTime);
751 }
752 if (captureAgentId.isPresent()) {
753 dublinCore.set(DublinCore.PROPERTY_SPATIAL, captureAgentId.get());
754 }
755 if (!DublinCoreUtil.equals(dublinCore, dublinCoreCopy)) {
756 dublinCoreChanged = true;
757 changedDublinCoreOpt = Optional.of(dublinCore);
758 mediaPackageOpt = Optional.of(updateDublincCoreCatalog(mediaPackageOpt.orElse(archivedMediaPackage),
759 changedDublinCoreOpt.get()));
760 }
761
762 Optional<Map<String, String>> finalCaProperties = Optional.empty();
763 if (propertiesChanged) {
764 finalCaProperties = Optional.of(getFinalAgentProperties(caProperties, wfProps, captureAgentId.orElse(agentId),
765 seriesId, Optional.of(changedDublinCoreOpt.orElse(
766 archivedDublinCore))));
767 }
768
769 String checksum = calculateChecksum(workspace, getEventCatalogUIAdapterFlavors(), startDateTime.orElse(start),
770 endDateTime.orElse(end), captureAgentId.orElse(agentId), userIds.orElse(presenters),
771 mediaPackageOpt.orElse(archivedMediaPackage),
772 Optional.of(changedDublinCoreOpt.orElse(archivedDublinCore)), wfProperties.orElse(wfProps),
773 finalCaProperties.orElse(caProperties), changedAclOpt.orElse(new AccessControlList()));
774
775 String oldChecksum = extendedEventDto.getChecksum();
776 if (checksum.equals(oldChecksum)) {
777 logger.debug("Updated event {} has same checksum, ignore update", mpId);
778 return;
779 }
780
781
782 persistEvent(mpId, checksum, startDateTime, endDateTime, captureAgentId, userIds,
783 mediaPackageOpt, wfProperties, finalCaProperties, Optional.empty());
784
785
786 updateLiveEvent(mpId, changedAclOpt, changedDublinCoreOpt, startDateTime, endDateTime, Optional.of(agentId),
787 finalCaProperties);
788
789
790 updateEventInIndex(mpId, changedAclOpt, changedDublinCoreOpt, startDateTime, endDateTime, userIds,
791 Optional.of(agentId), finalCaProperties, Optional.empty());
792
793
794 if (propertiesChanged || dublinCoreChanged || startDateTime.isPresent() || endDateTime.isPresent()) {
795 touchLastEntry(agentId);
796 if (captureAgentId.isPresent()) {
797 touchLastEntry(captureAgentId.get());
798 }
799 }
800 } catch (NotFoundException | SchedulerException e) {
801 throw e;
802 } catch (Exception e) {
803 throw new SchedulerException(e);
804 }
805 }
806
807 private boolean isAdmin() {
808 return (securityService.getUser().hasRole(GLOBAL_ADMIN_ROLE)
809 || securityService.getUser().hasRole(securityService.getOrganization().getAdminRole()));
810 }
811
812 private Optional<DublinCoreCatalog> loadEpisodeDublinCoreFromAsset(Snapshot snapshot) {
813 Optional<MediaPackageElement> dcCatalog = Arrays.stream(snapshot.getMediaPackage().getElements())
814 .filter(MediaPackageSupport.Filters::isEpisodeDublinCore)
815 .findFirst();
816 if (dcCatalog.isEmpty())
817 return Optional.empty();
818
819 Optional<Asset> asset = assetManager.getAsset(snapshot.getVersion(),
820 snapshot.getMediaPackage().getIdentifier().toString(), dcCatalog.get().getIdentifier());
821 if (asset.isEmpty())
822 return Optional.empty();
823
824 if (Availability.OFFLINE.equals(asset.get().getAvailability()))
825 return Optional.empty();
826
827 InputStream inputStream = null;
828 try {
829 inputStream = asset.get().getInputStream();
830 return Optional.of(DublinCores.read(inputStream));
831 } finally {
832 IOUtils.closeQuietly(inputStream);
833 }
834 }
835
836 @Override
837 public synchronized void removeEvent(String mediaPackageId)
838 throws NotFoundException, SchedulerException {
839 notEmpty(mediaPackageId, "mediaPackageId");
840
841 boolean notFoundInDatabase = false;
842 boolean notFoundInAssetManager;
843 try {
844
845 try {
846 Optional<ExtendedEventDto> extEvtOpt = persistence.getEvent(mediaPackageId);
847 if (extEvtOpt.isPresent()) {
848 String agentId = extEvtOpt.get().getCaptureAgentId();
849 persistence.deleteEvent(mediaPackageId);
850 if (StringUtils.isNotEmpty(agentId)) {
851 touchLastEntry(agentId);
852 }
853 } else {
854 notFoundInDatabase = true;
855 }
856 } catch (NotFoundException e) {
857 notFoundInDatabase = true;
858 }
859
860
861 long deletedSnapshots = assetManager.deleteSnapshots(mediaPackageId);
862 notFoundInAssetManager = deletedSnapshots == 0;
863
864
865 sendSchedulerUpdate(new SchedulerItemList(mediaPackageId, SchedulerItem.delete()));
866
867
868 removeSchedulingInfoFromIndex(mediaPackageId);
869 } catch (Exception e) {
870 logger.error("Could not remove event '{}' from persistent storage", mediaPackageId, e);
871 throw new SchedulerException(e);
872 }
873
874 if (notFoundInDatabase && notFoundInAssetManager) {
875 throw new NotFoundException();
876 }
877 }
878
879 @Override
880 public MediaPackage getMediaPackage(String mediaPackageId) throws NotFoundException, SchedulerException {
881 notEmpty(mediaPackageId, "mediaPackageId");
882
883 try {
884 return getEventMediaPackage(mediaPackageId);
885 } catch (RuntimeNotFoundException e) {
886 throw e.getWrappedException();
887 } catch (Exception e) {
888 logger.error("Failed to get mediapackage of event '{}':", mediaPackageId, e);
889 throw new SchedulerException(e);
890 }
891 }
892
893 @Override
894 public DublinCoreCatalog getDublinCore(String mediaPackageId) throws NotFoundException, SchedulerException {
895 notEmpty(mediaPackageId, "mediaPackageId");
896
897 try {
898 Optional<Snapshot> optSnapshot = assetManager.getLatestSnapshot(mediaPackageId);
899 if (optSnapshot.isEmpty())
900 throw new NotFoundException();
901
902 Optional<DublinCoreCatalog> dublinCore = loadEpisodeDublinCoreFromAsset(optSnapshot.get());
903 if (dublinCore.isEmpty())
904 throw new NotFoundException("No dublincore catalog found " + mediaPackageId);
905
906 return dublinCore.get();
907 } catch (NotFoundException e) {
908 throw e;
909 } catch (Exception e) {
910 logger.error("Failed to get dublin core catalog of event '{}':", mediaPackageId, e);
911 throw new SchedulerException(e);
912 }
913 }
914
915 @Override
916 public TechnicalMetadata getTechnicalMetadata(String mediaPackageId)
917 throws NotFoundException, UnauthorizedException, SchedulerException {
918 notEmpty(mediaPackageId, "mediaPackageId");
919
920 try {
921 final Optional<ExtendedEventDto> extEvt = persistence.getEvent(mediaPackageId);
922 if (extEvt.isEmpty())
923 throw new NotFoundException();
924
925 return getTechnicalMetadata(extEvt.get());
926 } catch (NotFoundException e) {
927 throw e;
928 } catch (Exception e) {
929 logger.error("Failed to get technical metadata of event '{}':", mediaPackageId, e);
930 throw new SchedulerException(e);
931 }
932 }
933
934 @Override
935 public Map<String, String> getWorkflowConfig(String mediaPackageId) throws NotFoundException, SchedulerException {
936 notEmpty(mediaPackageId, "mediaPackageId");
937
938 try {
939 Optional<ExtendedEventDto> record = persistence.getEvent(mediaPackageId);
940 if (record.isEmpty())
941 throw new NotFoundException();
942 return deserializeExtendedEventProperties(record.get().getWorkflowProperties());
943 } catch (NotFoundException e) {
944 throw e;
945 } catch (Exception e) {
946 logger.error("Failed to get workflow configuration of event '{}':", mediaPackageId, e);
947 throw new SchedulerException(e);
948 }
949 }
950
951 @Override
952 public Map<String, String> getCaptureAgentConfiguration(String mediaPackageId)
953 throws NotFoundException, SchedulerException {
954 notEmpty(mediaPackageId, "mediaPackageId");
955
956 try {
957 Optional<ExtendedEventDto> record = persistence.getEvent(mediaPackageId);
958 if (record.isEmpty())
959 throw new NotFoundException();
960 return deserializeExtendedEventProperties(record.get().getCaptureAgentProperties());
961 } catch (NotFoundException e) {
962 throw e;
963 } catch (Exception e) {
964 logger.error("Failed to get capture agent contiguration of event '{}':", mediaPackageId, e);
965 throw new SchedulerException(e);
966 }
967 }
968
969 @Override
970 public int getEventCount() throws SchedulerException {
971 try {
972 return persistence.countEvents();
973 } catch (Exception e) {
974 throw new SchedulerException(e);
975 }
976 }
977
978 @Override
979 public List<MediaPackage> search(Optional<String> captureAgentId, Optional<Date> startsFrom, Optional<Date> startsTo,
980 Optional<Date> endFrom, Optional<Date> endTo) throws SchedulerException {
981 try {
982 return persistence.search(captureAgentId, startsFrom, startsTo, endFrom, endTo, Optional.empty()).stream()
983 .map(ExtendedEventDto::getMediaPackageId)
984 .map(this::getEventMediaPackage).collect(Collectors.toList());
985 } catch (Exception e) {
986 throw new SchedulerException(e);
987 }
988 }
989
990 @Override
991 public Optional<MediaPackage> getCurrentRecording(String captureAgentId) throws SchedulerException {
992 try {
993 final Date now = new Date();
994 List<ExtendedEventDto> result = persistence.search(Optional.of(captureAgentId), Optional.empty(), Optional.of(now), Optional.of(now), Optional.empty(), Optional.of(1));
995 if (result.isEmpty()) {
996 return Optional.empty();
997 }
998 return Optional.of(getEventMediaPackage(result.get(0).getMediaPackageId()));
999 } catch (Exception e) {
1000 throw new SchedulerException(e);
1001 }
1002 }
1003
1004 @Override
1005 public Optional<MediaPackage> getUpcomingRecording(String captureAgentId) throws SchedulerException {
1006 try {
1007 final Date now = new Date();
1008 List<ExtendedEventDto> result = persistence.search(Optional.of(captureAgentId), Optional.of(now), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(1));
1009 if (result.isEmpty()) {
1010 return Optional.empty();
1011 }
1012 return Optional.of(getEventMediaPackage(result.get(0).getMediaPackageId()));
1013 } catch (Exception e) {
1014 throw new SchedulerException(e);
1015 }
1016 }
1017
1018 @Override
1019 public List<MediaPackage> findConflictingEvents(String captureDeviceID, Date startDate, Date endDate)
1020 throws SchedulerException {
1021 try {
1022 final Organization organization = securityService.getOrganization();
1023 final User user = SecurityUtil.createSystemUser(systemUserName, organization);
1024 List<MediaPackage> conflictingEvents = new ArrayList<>();
1025
1026 SecurityUtil.runAs(securityService, organization, user, () -> {
1027 try {
1028 persistence.getEvents(captureDeviceID, startDate, endDate, Util.EVENT_MINIMUM_SEPARATION_MILLISECONDS)
1029 .stream()
1030 .map(id -> getEventMediaPackage(id, false))
1031 .forEach(conflictingEvents::add);
1032 } catch (SchedulerServiceDatabaseException e) {
1033 logger.error("Failed to get conflicting events", e);
1034 }
1035 });
1036
1037 return conflictingEvents;
1038
1039 } catch (Exception e) {
1040 throw new SchedulerException(e);
1041 }
1042 }
1043
1044 @Override
1045 public List<MediaPackage> findConflictingEvents(String captureAgentId, RRule rrule, Date start, Date end,
1046 long duration, TimeZone tz) throws SchedulerException {
1047 notEmpty(captureAgentId, "captureAgentId");
1048 notNull(rrule, "rrule");
1049 notNull(start, "start");
1050 notNull(end, "end");
1051 notNull(tz, "timeZone");
1052
1053 Util.adjustRrule(rrule, start, tz);
1054 final List<Period> periods = Util.calculatePeriods(start, end, duration, rrule, tz);
1055
1056 if (periods.isEmpty()) {
1057 return Collections.emptyList();
1058 }
1059
1060 return findConflictingEvents(periods, captureAgentId, tz);
1061 }
1062
1063 private boolean checkPeriodOverlap(final List<Period> periods) {
1064 final List<Period> sortedPeriods = new ArrayList<>(periods);
1065 sortedPeriods.sort(Comparator.comparing(Period::getStart));
1066 Period prior = periods.get(0);
1067 for (Period current : periods.subList(1, periods.size())) {
1068 if (current.getStart().compareTo(prior.getEnd()) < 0) {
1069 return true;
1070 }
1071 prior = current;
1072 }
1073 return false;
1074 }
1075
1076 private List<MediaPackage> findConflictingEvents(List<Period> periods, String captureAgentId, TimeZone tz)
1077 throws SchedulerException {
1078 notEmpty(captureAgentId, "captureAgentId");
1079 notNull(periods, "periods");
1080 requireTrue(periods.size() > 0, "periods");
1081
1082
1083
1084
1085 if (checkPeriodOverlap(periods)) {
1086 throw new IllegalArgumentException("RRULE periods overlap");
1087 }
1088
1089 try {
1090 TimeZoneRegistry registry = TimeZoneRegistryFactory.getInstance().createRegistry();
1091
1092 Set<MediaPackage> events = new HashSet<>();
1093
1094 for (Period event : periods) {
1095 event.setTimeZone(registry.getTimeZone(tz.getID()));
1096 final Date startDate = event.getStart();
1097 final Date endDate = event.getEnd();
1098
1099 events.addAll(findConflictingEvents(captureAgentId, startDate, endDate));
1100 }
1101
1102 return new ArrayList<>(events);
1103 } catch (Exception e) {
1104 throw new SchedulerException(e);
1105 }
1106 }
1107
1108 @Override
1109 public String getCalendar(Optional<String> captureAgentId, Optional<String> seriesId, Optional<Date> cutoff)
1110 throws SchedulerException {
1111
1112 try {
1113 final Map<String, ExtendedEventDto> searchResult = persistence.search(captureAgentId, Optional.empty(), cutoff,
1114 Optional.of(DateTime.now().minusHours(1).toDate()), Optional.empty(), Optional.empty()).stream()
1115 .collect(Collectors.toMap(ExtendedEventDto::getMediaPackageId, Function.identity()));
1116 var mpIds = searchResult.keySet();
1117 List<Snapshot> snapshots = assetManager.getLatestSnapshots(mpIds);
1118
1119 final CalendarGenerator cal = new CalendarGenerator(seriesService);
1120 for (String mpId : mpIds) {
1121 final Optional<Snapshot> optSnapshot = snapshots.stream()
1122 .filter(mp -> mp.getMediaPackage().getIdentifier().toString().equals(mpId))
1123 .findFirst();
1124
1125
1126 if (optSnapshot.isEmpty()) {
1127 logger.warn("Mediapackage for event '{}' can't be found, event is not recorded", mpId);
1128 continue;
1129 }
1130
1131 Snapshot snapshot = optSnapshot.get();
1132
1133 if (seriesId.isPresent() && !seriesId.get().equals(snapshot.getMediaPackage().getSeries())) {
1134 continue;
1135 }
1136
1137 Optional<DublinCoreCatalog> catalogOpt = loadEpisodeDublinCoreFromAsset(snapshot);
1138 if (catalogOpt.isEmpty()) {
1139 logger.warn("No episode catalog available, skipping!");
1140 continue;
1141 }
1142
1143 final Map<String, String> caMetadata = deserializeExtendedEventProperties(searchResult.get(mpId).getCaptureAgentProperties());
1144
1145
1146 if (caMetadata.isEmpty()) {
1147 logger.warn("Properties for event '{}' can't be found, event is not recorded", mpId);
1148 continue;
1149 }
1150
1151 final String agentId = searchResult.get(mpId).getCaptureAgentId();
1152 final Date start = searchResult.get(mpId).getStartDate();
1153 final Date end = searchResult.get(mpId).getEndDate();
1154 final Date lastModified = snapshot.getArchivalDate();
1155
1156
1157 try {
1158 cal.addEvent(snapshot.getMediaPackage(), catalogOpt.get(), agentId, start, end, lastModified, toPropertyString(caMetadata));
1159 } catch (Exception e) {
1160 logger.warn("Error adding event '{}' to calendar, event is not recorded", mpId, e);
1161 }
1162 }
1163
1164
1165 if (cal.getCalendar().getComponents().size() > 0) {
1166 cal.getCalendar().validate();
1167 }
1168
1169 return cal.getCalendar().toString();
1170
1171 } catch (Exception e) {
1172 throw new SchedulerException(e);
1173 }
1174 }
1175
1176 @Override
1177 public String getScheduleLastModified(String captureAgentId) throws SchedulerException {
1178 notEmpty(captureAgentId, "captureAgentId");
1179
1180 try {
1181 String lastModified = lastModifiedCache.getIfPresent(captureAgentId);
1182 if (lastModified != null)
1183 return lastModified;
1184
1185 populateLastModifiedCache();
1186
1187 lastModified = lastModifiedCache.getIfPresent(captureAgentId);
1188
1189
1190 if (lastModified == null) {
1191 lastModified = EMPTY_CALENDAR_ETAG;
1192 lastModifiedCache.put(captureAgentId, lastModified);
1193 }
1194 return lastModified;
1195 } catch (Exception e) {
1196 throw new SchedulerException(e);
1197 }
1198 }
1199
1200 @Override
1201 public void removeScheduledRecordingsBeforeBuffer(long buffer) throws SchedulerException {
1202 DateTime end = new DateTime(DateTimeZone.UTC).minus(buffer * 1000);
1203
1204 logger.info("Starting to look for scheduled recordings that have finished before {}.",
1205 DateTimeSupport.toUTC(end.getMillis()));
1206
1207 List<ExtendedEventDto> finishedEvents;
1208 try {
1209 finishedEvents = persistence.search(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(),
1210 Optional.of(end.toDate()), Optional.empty());
1211 logger.debug("Found {} events from search.", finishedEvents.size());
1212 } catch (Exception e) {
1213 throw new SchedulerException(e);
1214 }
1215
1216 int recordingsRemoved = 0;
1217 for (ExtendedEventDto extEvt : finishedEvents) {
1218 final String eventId = extEvt.getMediaPackageId();
1219 try {
1220 removeEvent(eventId);
1221 logger.debug("Sucessfully removed scheduled event with id " + eventId);
1222 recordingsRemoved++;
1223 } catch (NotFoundException e) {
1224 logger.debug("Skipping event with id {} because it is not found", eventId);
1225 } catch (Exception e) {
1226 logger.warn("Unable to delete event with id '{}':", eventId, e);
1227 }
1228 }
1229
1230 logger.info("Found {} to remove that ended before {}.", recordingsRemoved, DateTimeSupport.toUTC(end.getMillis()));
1231 }
1232
1233 @Override
1234 public boolean updateRecordingState(String id, String state) throws NotFoundException, SchedulerException {
1235 notEmpty(id, "id");
1236 notEmpty(state, "state");
1237
1238 if (!RecordingState.KNOWN_STATES.contains(state)) {
1239 logger.warn("Invalid recording state: {}.", state);
1240 return false;
1241 }
1242
1243 try {
1244 final Optional<ExtendedEventDto> optExtEvt = persistence.getEvent(id);
1245
1246 if (optExtEvt.isEmpty())
1247 throw new NotFoundException();
1248
1249 final String prevRecordingState = optExtEvt.get().getRecordingState();
1250 final Recording r = new RecordingImpl(id, state);
1251 if (!state.equals(prevRecordingState)) {
1252 logger.debug("Setting Recording {} to state {}.", id, state);
1253
1254
1255 sendSchedulerUpdate(new SchedulerItemList(r.getID(), Collections.singletonList(SchedulerItem
1256 .updateRecordingStatus(r.getState(), r.getLastCheckinTime()))));
1257
1258
1259 updateEventInIndex(r.getID(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(),
1260 Optional.empty(), Optional.empty(), Optional.of(r.getState()));
1261 } else {
1262 logger.debug("Recording state not changed");
1263 }
1264
1265 persistence.storeEvent(
1266 id,
1267 securityService.getOrganization().getId(),
1268 Optional.empty(),
1269 Optional.empty(),
1270 Optional.empty(),
1271 Optional.empty(),
1272 Optional.of(r.getState()),
1273 Optional.of(r.getLastCheckinTime()),
1274 Optional.empty(),
1275 Optional.empty(),
1276 Optional.empty(),
1277 Optional.empty(),
1278 Optional.empty()
1279 );
1280 return true;
1281 } catch (NotFoundException e) {
1282 throw e;
1283 } catch (Exception e) {
1284 throw new SchedulerException(e);
1285 }
1286 }
1287
1288 @Override
1289 public Recording getRecordingState(String id) throws NotFoundException, SchedulerException {
1290
1291 notEmpty(id, "id");
1292
1293 try {
1294 Optional<ExtendedEventDto> extEvt = persistence.getEvent(id);
1295
1296 if (extEvt.isEmpty() || extEvt.get().getRecordingState() == null) {
1297 throw new NotFoundException();
1298 }
1299
1300 return new RecordingImpl(id, extEvt.get().getRecordingState(), extEvt.get().getRecordingLastHeard());
1301 } catch (NotFoundException e) {
1302 throw e;
1303 } catch (Exception e) {
1304 throw new SchedulerException(e);
1305 }
1306 }
1307
1308 @Override
1309 public void removeRecording(String id) throws NotFoundException, SchedulerException {
1310 notEmpty(id, "id");
1311
1312 try {
1313 persistence.resetRecordingState(id);
1314
1315
1316 sendSchedulerUpdate(new SchedulerItemList(id, SchedulerItem.deleteRecordingState()));
1317
1318
1319 removeRecordingStatusFromIndex(id);
1320 } catch (NotFoundException e) {
1321 throw e;
1322 } catch (Exception e) {
1323 throw new SchedulerException(e);
1324 }
1325 }
1326
1327 @Override
1328 public Map<String, Recording> getKnownRecordings() throws SchedulerException {
1329 try {
1330 return persistence.getKnownRecordings().parallelStream()
1331 .collect(
1332 Collectors.toMap(ExtendedEventDto::getMediaPackageId,
1333 dto -> new RecordingImpl(dto.getMediaPackageId(), dto.getRecordingState(), dto.getRecordingLastHeard()))
1334 );
1335 } catch (Exception e) {
1336 throw new SchedulerException(e);
1337 }
1338 }
1339
1340 private synchronized void persistEvent(final String mpId, final String checksum,
1341 final Optional<Date> startDateTime, final Optional<Date> endDateTime, final Optional<String> captureAgentId,
1342 final Optional<Set<String>> userIds, final Optional<MediaPackage> mediaPackage,
1343 final Optional<Map<String, String>> wfProperties, final Optional<Map<String, String>> caProperties,
1344 final Optional<String> schedulingSource) throws SchedulerServiceDatabaseException {
1345
1346 if (mediaPackage.isPresent()) {
1347 assetManager.takeSnapshot(SNAPSHOT_OWNER, mediaPackage.get());
1348 }
1349
1350
1351 persistence.storeEvent(
1352 mpId,
1353 securityService.getOrganization().getId(),
1354 captureAgentId,
1355 startDateTime,
1356 endDateTime,
1357 schedulingSource,
1358 Optional.empty(),
1359 Optional.empty(),
1360 userIds.isPresent() ? Optional.of(String.join(",", userIds.get())) : Optional.empty(),
1361 Optional.of(new Date()),
1362 Optional.of(checksum),
1363 wfProperties,
1364 caProperties
1365 );
1366 }
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381 private void updateEventInIndex(String mediaPackageId, Optional<AccessControlList> acl,
1382 Optional<DublinCoreCatalog> dublinCore, Optional<Date> startTime, Optional<Date> endTime, Optional<Set<String>> presenters,
1383 Optional<String> agentId, Optional<Map<String, String>> properties, Optional<String> recordingStatus) {
1384
1385 String organization = getSecurityService().getOrganization().getId();
1386 User user = getSecurityService().getUser();
1387
1388 Function<Optional<Event>, Optional<Event>> updateFunction = getEventUpdateFunction(mediaPackageId, acl, dublinCore,
1389 startTime, endTime, presenters, agentId, properties, recordingStatus, organization, user);
1390
1391 try {
1392 index.addOrUpdateEvent(mediaPackageId, updateFunction, organization, user);
1393 logger.debug("Scheduled event {} updated in the {} index.", mediaPackageId, index.getIndexName());
1394 } catch (SearchIndexException e) {
1395 logger.error("Error updating the scheduled event {} in the {} index.", mediaPackageId, index.getIndexName(), e);
1396 }
1397 }
1398
1399
1400
1401
1402
1403
1404 private void removeRecordingStatusFromIndex(String mediaPackageId) {
1405 String organization = getSecurityService().getOrganization().getId();
1406 User user = getSecurityService().getUser();
1407
1408 Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
1409 Event event = eventOpt.orElse(new Event(mediaPackageId, organization));
1410 event.setRecordingStatus(null);
1411 return Optional.of(event);
1412 };
1413
1414 try {
1415 index.addOrUpdateEvent(mediaPackageId, updateFunction, organization, user);
1416 logger.debug("Recording state of event {} removed from the {} index.", mediaPackageId, index.getIndexName());
1417 } catch (SearchIndexException e) {
1418 logger.error("Failed to remove the recording state of event {} from the {} index.", mediaPackageId,
1419 index.getIndexName(), e);
1420 }
1421 }
1422
1423
1424
1425
1426
1427
1428 private void removeSchedulingInfoFromIndex(String mediaPackageId) {
1429 String orgId = getSecurityService().getOrganization().getId();
1430
1431 try {
1432 index.deleteEvent(mediaPackageId, orgId);
1433 logger.debug("Scheduling information of event {} removed from the {} index.", mediaPackageId,
1434 index.getIndexName());
1435 } catch (SearchIndexException e) {
1436 logger.error("Failed to delete the scheduling information of event {} from the {} index.", mediaPackageId,
1437 index.getIndexName(), e);
1438 }
1439 }
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452 private void updateLiveEvent(String mpId, Optional<AccessControlList> acl, Optional<DublinCoreCatalog> dublinCore,
1453 Optional<Date> startTime, Optional<Date> endTime, Optional<String> agentId, Optional<Map<String, String>> properties) {
1454 List<SchedulerItem> items = new ArrayList<>();
1455 if (acl.isPresent()) {
1456 items.add(SchedulerItem.updateAcl(acl.get()));
1457 }
1458 if (dublinCore.isPresent()) {
1459 items.add(SchedulerItem.updateCatalog(dublinCore.get()));
1460 }
1461 if (startTime.isPresent()) {
1462 items.add(SchedulerItem.updateStart(startTime.get()));
1463 }
1464 if (endTime.isPresent()) {
1465 items.add(SchedulerItem.updateEnd(endTime.get()));
1466 }
1467 if (agentId.isPresent()) {
1468 items.add(SchedulerItem.updateAgent(agentId.get()));
1469 }
1470 if (properties.isPresent()) {
1471 items.add(SchedulerItem.updateProperties(properties.get()));
1472 }
1473
1474 if (!items.isEmpty()) {
1475 sendSchedulerUpdate(new SchedulerItemList(mpId, items));
1476 }
1477 }
1478
1479 private Map<String, String> getFinalAgentProperties(Map<String, String> caMetadata, Map<String, String> wfProperties,
1480 String captureAgentId, Optional<String> seriesId, Optional<DublinCoreCatalog> dublinCore) {
1481 Map<String, String> properties = new HashMap<>();
1482 for (Entry<String, String> entry : caMetadata.entrySet()) {
1483 if (entry.getKey().startsWith(WORKFLOW_CONFIG_PREFIX))
1484 continue;
1485 properties.put(entry.getKey(), entry.getValue());
1486 }
1487 for (Entry<String, String> entry : wfProperties.entrySet()) {
1488 properties.put(WORKFLOW_CONFIG_PREFIX.concat(entry.getKey()), entry.getValue());
1489 }
1490 if (dublinCore.isPresent()) {
1491 properties.put("event.title", dublinCore.get().getFirst(DublinCore.PROPERTY_TITLE));
1492 }
1493 if (seriesId.isPresent()) {
1494 properties.put("event.series", seriesId.get());
1495 }
1496 properties.put("event.location", captureAgentId);
1497 return properties;
1498 }
1499
1500 private void touchLastEntry(String captureAgentId) throws SchedulerException {
1501
1502 try {
1503 logger.debug("Marking calendar feed for {} as modified", captureAgentId);
1504 persistence.touchLastEntry(captureAgentId);
1505 populateLastModifiedCache();
1506 } catch (SchedulerServiceDatabaseException e) {
1507 logger.error("Failed to update last modified entry of agent '{}':", captureAgentId, e);
1508 }
1509 }
1510
1511 private void populateLastModifiedCache() throws SchedulerException {
1512 try {
1513 Map<String, Date> lastModifiedDates = persistence.getLastModifiedDates();
1514 for (Entry<String, Date> entry : lastModifiedDates.entrySet()) {
1515 Date lastModifiedDate = entry.getValue() != null ? entry.getValue() : new Date();
1516 lastModifiedCache.put(entry.getKey(), generateLastModifiedHash(lastModifiedDate));
1517 }
1518 } catch (Exception e) {
1519 throw new SchedulerException(e);
1520 }
1521 }
1522
1523 private String generateLastModifiedHash(Date lastModifiedDate) {
1524 return "mod" + Long.toString(lastModifiedDate.getTime());
1525 }
1526
1527 private String toPropertyString(Map<String, String> properties) {
1528 StringBuilder wfPropertiesString = new StringBuilder();
1529 for (Map.Entry<String, String> entry : properties.entrySet())
1530 wfPropertiesString.append(entry.getKey() + "=" + entry.getValue() + "\n");
1531 return wfPropertiesString.toString();
1532 }
1533
1534 private MediaPackage getEventMediaPackage(final String mediaPackageId, boolean checkOwner) {
1535 Optional<MediaPackage> mediaPackage = assetManager.getMediaPackage(mediaPackageId);
1536
1537 if (mediaPackage.isEmpty())
1538 throw new RuntimeNotFoundException(new NotFoundException());
1539
1540 return mediaPackage.get();
1541 }
1542
1543 private MediaPackage getEventMediaPackage(final String mediaPackageId) {
1544 return getEventMediaPackage(mediaPackageId, true);
1545 }
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559 private MediaPackage updateDublincCoreCatalog(MediaPackage mp, DublinCoreCatalog dc)
1560 throws IOException, MediaPackageException {
1561 try (InputStream inputStream = IOUtils.toInputStream(dc.toXmlString(), "UTF-8")) {
1562
1563 Catalog[] catalogs = mp.getCatalogs(MediaPackageElements.EPISODE);
1564 if (catalogs.length > 0) {
1565 Catalog catalog = catalogs[0];
1566 URI uri = workspace.put(mp.getIdentifier().toString(), catalog.getIdentifier(), "dublincore.xml", inputStream);
1567 catalog.setURI(uri);
1568
1569 catalog.setChecksum(null);
1570 } else {
1571 throw new MediaPackageException("Unable to find catalog");
1572 }
1573 }
1574 return mp;
1575 }
1576
1577 private TechnicalMetadata getTechnicalMetadata(ExtendedEventDto extEvt) {
1578 final String agentId = extEvt.getCaptureAgentId();
1579 final Date start = extEvt.getStartDate();
1580 final Date end = extEvt.getEndDate();
1581 final Set<String> presenters = getPresenters(Optional.ofNullable(extEvt.getPresenters()).orElse(""));
1582 final Optional<String> recordingStatus = Optional.ofNullable(extEvt.getRecordingState());
1583 final Optional<Long> lastHeard = Optional.ofNullable(extEvt.getRecordingLastHeard());
1584 final Map<String, String> caMetadata = deserializeExtendedEventProperties(extEvt.getCaptureAgentProperties());
1585 final Map<String, String> wfProperties = deserializeExtendedEventProperties(extEvt.getWorkflowProperties());
1586
1587 Recording recording = null;
1588 if (recordingStatus.isPresent() && lastHeard.isPresent())
1589 recording = new RecordingImpl(extEvt.getMediaPackageId(), recordingStatus.get(), lastHeard.get());
1590
1591 return new TechnicalMetadataImpl(extEvt.getMediaPackageId(), agentId, start, end, presenters, wfProperties,
1592 caMetadata, Optional.ofNullable(recording));
1593 }
1594
1595 private Set<String> getPresenters(String presentersString) {
1596 return new HashSet<>(Arrays.asList(StringUtils.split(presentersString, ",")));
1597 }
1598
1599
1600
1601
1602 private List<MediaPackageElementFlavor> getEventCatalogUIAdapterFlavors() {
1603 String organization = securityService.getOrganization().getId();
1604 return eventCatalogUIAdapters.stream()
1605 .filter(adapter -> adapter.getOrganization().equals(organization))
1606 .map(EventCatalogUIAdapter::getFlavor)
1607 .filter(mpe -> !MediaPackageElements.EPISODE.matches(mpe))
1608 .collect(Collectors.toList());
1609 }
1610
1611 @Override
1612 public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
1613 try {
1614 final int total;
1615 try {
1616 total = persistence.countEvents();
1617 } catch (SchedulerServiceDatabaseException e) {
1618 logIndexRebuildError(logger, e);
1619 throw new IndexRebuildException(getService(), e);
1620 }
1621 logIndexRebuildBegin(logger, total, "scheduled events");
1622 final int[] current = {0};
1623 int n = 20;
1624 var updatedEventRange = new ArrayList<Event>();
1625
1626 for (Organization organization: orgDirectoryService.getOrganizations()) {
1627 final User user = SecurityUtil.createSystemUser(systemUserName, organization);
1628 SecurityUtil.runAs(securityService, organization, user,
1629 () -> {
1630 final List<ExtendedEventDto> events;
1631 try {
1632 events = persistence.getEvents();
1633 } catch (SchedulerServiceDatabaseException e) {
1634 logIndexRebuildError(logger, e, organization);
1635 return;
1636 }
1637
1638 for (ExtendedEventDto event : events) {
1639 try {
1640 current[0]++;
1641
1642 var updatedEventData = Optional.of(new Event(event.getMediaPackageId(), organization.getId()));
1643
1644 final Set<String> presenters = getPresenters(
1645 Optional.ofNullable(event.getPresenters()).orElse(""));
1646 final Map<String, String> caMetadata = deserializeExtendedEventProperties(
1647 event.getCaptureAgentProperties());
1648
1649 updatedEventData = getEventUpdateFunction(event.getMediaPackageId(), Optional.empty(), Optional.empty(),
1650 Optional.of(event.getStartDate()), Optional.of(event.getEndDate()), Optional.of(presenters),
1651 Optional.of(event.getCaptureAgentId()), Optional.of(caMetadata),
1652 Optional.ofNullable(event.getRecordingState()), organization.getId(),
1653 securityService.getUser()).apply(updatedEventData);
1654 updatedEventRange.add(updatedEventData.get());
1655
1656 if (updatedEventRange.size() >= n || current[0] >= events.size()) {
1657 index.bulkEventUpdate(updatedEventRange);
1658 logIndexRebuildProgress(logger, total, current[0], n);
1659 updatedEventRange.clear();
1660 }
1661
1662 } catch (SearchIndexException e) {
1663 logger.error("Error while updating event '{}' from search index:", event.getMediaPackageId(), e);
1664 } catch (Exception e) {
1665 throw new RuntimeException("Fatal error while indexing event " + event.getMediaPackageId(), e);
1666 }
1667 }
1668 });
1669 }
1670 } catch (Exception e) {
1671 logIndexRebuildError(logger, e);
1672 throw new IndexRebuildException(getService(), e);
1673 }
1674 }
1675
1676 @Override
1677 public IndexRebuildService.Service getService() {
1678 return IndexRebuildService.Service.Scheduler;
1679 }
1680
1681 public SecurityService getSecurityService() {
1682 return securityService;
1683 }
1684
1685
1686
1687
1688
1689
1690
1691
1692 private Function<Optional<Event>, Optional<Event>> getEventUpdateFunction(String mediaPackageId,
1693 Optional<AccessControlList> acl, Optional<DublinCoreCatalog> dublinCore, Optional<Date> startTime,
1694 Optional<Date> endTime, Optional<Set<String>> presenters, Optional<String> agentId,
1695 Optional<Map<String, String>> properties, Optional<String> recordingStatus, String orgId, User user) {
1696 return (Optional<Event> eventOpt) -> {
1697 Event event = eventOpt.orElse(new Event(mediaPackageId, orgId));
1698
1699 if (acl.isPresent()) {
1700 event.setAccessPolicy(AccessControlParser.toJsonSilent(acl.get()));
1701 }
1702 if (dublinCore.isPresent()) {
1703 EventIndexUtils.updateEvent(event, dublinCore.get());
1704 if (isBlank(event.getCreator()))
1705 event.setCreator(getSecurityService().getUser().getName());
1706
1707
1708 try {
1709 EventIndexUtils.updateSeriesName(event, orgId, user, index);
1710 } catch (SearchIndexException e) {
1711 logger.error("Error updating the series name of the event {} in the {} index.", mediaPackageId,
1712 index.getIndexName(), e);
1713 }
1714 }
1715 if (presenters.isPresent()) {
1716 event.setTechnicalPresenters(new ArrayList<>(presenters.get()));
1717 }
1718 if (agentId.isPresent()) {
1719 event.setAgentId(agentId.get());
1720 }
1721 if (recordingStatus.isPresent() && !recordingStatus.get().equals(RecordingState.UNKNOWN)) {
1722 event.setRecordingStatus(recordingStatus.get());
1723 }
1724 if (properties.isPresent()) {
1725 event.setAgentConfiguration(properties.get());
1726 }
1727 if (startTime.isPresent()) {
1728 String startTimeStr = startTime == null ? null : DateTimeSupport.toUTC(startTime.get().getTime());
1729 event.setTechnicalStartTime(startTimeStr);
1730 }
1731 if (endTime.isPresent()) {
1732 String endTimeStr = endTime == null ? null : DateTimeSupport.toUTC(endTime.get().getTime());
1733 event.setTechnicalEndTime(endTimeStr);
1734 }
1735
1736 return Optional.of(event);
1737 };
1738 }
1739 }