1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.scheduler.impl;
22
23 import static org.apache.commons.lang3.StringUtils.isBlank;
24 import static org.opencastproject.scheduler.impl.SchedulerUtil.calculateChecksum;
25 import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
26 import static org.opencastproject.util.EqualsUtil.ne;
27 import static org.opencastproject.util.RequireUtil.notEmpty;
28 import static org.opencastproject.util.RequireUtil.notNull;
29 import static org.opencastproject.util.RequireUtil.requireTrue;
30
31 import org.opencastproject.assetmanager.api.Asset;
32 import org.opencastproject.assetmanager.api.AssetManager;
33 import org.opencastproject.assetmanager.api.Availability;
34 import org.opencastproject.assetmanager.api.Snapshot;
35 import org.opencastproject.elasticsearch.api.SearchIndexException;
36 import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
37 import org.opencastproject.elasticsearch.index.objects.event.Event;
38 import org.opencastproject.elasticsearch.index.objects.event.EventIndexUtils;
39 import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
40 import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
41 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
42 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
43 import org.opencastproject.mediapackage.Catalog;
44 import org.opencastproject.mediapackage.MediaPackage;
45 import org.opencastproject.mediapackage.MediaPackageElement;
46 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
47 import org.opencastproject.mediapackage.MediaPackageElements;
48 import org.opencastproject.mediapackage.MediaPackageException;
49 import org.opencastproject.mediapackage.MediaPackageSupport;
50 import org.opencastproject.mediapackage.identifier.Id;
51 import org.opencastproject.mediapackage.identifier.IdImpl;
52 import org.opencastproject.message.broker.api.scheduler.SchedulerItem;
53 import org.opencastproject.message.broker.api.scheduler.SchedulerItemList;
54 import org.opencastproject.message.broker.api.update.SchedulerUpdateHandler;
55 import org.opencastproject.metadata.dublincore.DCMIPeriod;
56 import org.opencastproject.metadata.dublincore.DublinCore;
57 import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
58 import org.opencastproject.metadata.dublincore.DublinCoreUtil;
59 import org.opencastproject.metadata.dublincore.DublinCoreValue;
60 import org.opencastproject.metadata.dublincore.DublinCores;
61 import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
62 import org.opencastproject.metadata.dublincore.EventCatalogUIAdapter;
63 import org.opencastproject.metadata.dublincore.Precision;
64 import org.opencastproject.scheduler.api.Recording;
65 import org.opencastproject.scheduler.api.RecordingImpl;
66 import org.opencastproject.scheduler.api.RecordingState;
67 import org.opencastproject.scheduler.api.SchedulerConflictException;
68 import org.opencastproject.scheduler.api.SchedulerException;
69 import org.opencastproject.scheduler.api.SchedulerService;
70 import org.opencastproject.scheduler.api.TechnicalMetadata;
71 import org.opencastproject.scheduler.api.TechnicalMetadataImpl;
72 import org.opencastproject.scheduler.api.Util;
73 import org.opencastproject.scheduler.impl.persistence.ExtendedEventDto;
74 import org.opencastproject.security.api.AccessControlList;
75 import org.opencastproject.security.api.AccessControlParser;
76 import org.opencastproject.security.api.AccessControlUtil;
77 import org.opencastproject.security.api.AuthorizationService;
78 import org.opencastproject.security.api.Organization;
79 import org.opencastproject.security.api.OrganizationDirectoryService;
80 import org.opencastproject.security.api.SecurityService;
81 import org.opencastproject.security.api.UnauthorizedException;
82 import org.opencastproject.security.api.User;
83 import org.opencastproject.security.util.SecurityUtil;
84 import org.opencastproject.series.api.SeriesService;
85 import org.opencastproject.util.DateTimeSupport;
86 import org.opencastproject.util.NotFoundException;
87 import org.opencastproject.util.OsgiUtil;
88 import org.opencastproject.util.XmlNamespaceBinding;
89 import org.opencastproject.util.XmlNamespaceContext;
90 import org.opencastproject.util.data.functions.Misc;
91 import org.opencastproject.workspace.api.Workspace;
92
93 import com.google.common.cache.Cache;
94 import com.google.common.cache.CacheBuilder;
95 import com.google.gson.Gson;
96 import com.google.gson.reflect.TypeToken;
97
98 import net.fortuna.ical4j.model.Period;
99 import net.fortuna.ical4j.model.TimeZoneRegistry;
100 import net.fortuna.ical4j.model.TimeZoneRegistryFactory;
101 import net.fortuna.ical4j.model.property.RRule;
102
103 import org.apache.commons.io.IOUtils;
104 import org.apache.commons.lang3.StringUtils;
105 import org.joda.time.DateTime;
106 import org.joda.time.DateTimeZone;
107 import org.osgi.service.cm.ConfigurationException;
108 import org.osgi.service.cm.ManagedService;
109 import org.osgi.service.component.ComponentContext;
110 import org.osgi.service.component.annotations.Activate;
111 import org.osgi.service.component.annotations.Component;
112 import org.osgi.service.component.annotations.Reference;
113 import org.osgi.service.component.annotations.ReferenceCardinality;
114 import org.osgi.service.component.annotations.ReferencePolicy;
115 import org.osgi.service.component.annotations.ReferencePolicyOption;
116 import org.slf4j.Logger;
117 import org.slf4j.LoggerFactory;
118
119 import java.io.IOException;
120 import java.io.InputStream;
121 import java.lang.reflect.Type;
122 import java.net.URI;
123 import java.util.ArrayList;
124 import java.util.Arrays;
125 import java.util.Calendar;
126 import java.util.Collections;
127 import java.util.Comparator;
128 import java.util.Date;
129 import java.util.Dictionary;
130 import java.util.HashMap;
131 import java.util.HashSet;
132 import java.util.LinkedList;
133 import java.util.List;
134 import java.util.Map;
135 import java.util.Map.Entry;
136 import java.util.Optional;
137 import java.util.Set;
138 import java.util.TimeZone;
139 import java.util.UUID;
140 import java.util.concurrent.ConcurrentHashMap;
141 import java.util.concurrent.CopyOnWriteArrayList;
142 import java.util.concurrent.TimeUnit;
143 import java.util.function.Function;
144 import java.util.stream.Collectors;
145
146
147
148
149 @Component(
150 service = { ManagedService.class, SchedulerService.class, IndexProducer.class },
151 property = {
152 "service.description=Scheduler Service"
153 }
154 )
155 public class SchedulerServiceImpl extends AbstractIndexProducer implements SchedulerService, ManagedService {
156
157
158 private static final Logger logger = LoggerFactory.getLogger(SchedulerServiceImpl.class);
159
160
161 private static final String CFG_KEY_LAST_MODIFIED_CACHE_EXPIRE = "last_modified_cache_expire";
162
163
164 private static final String CFG_KEY_MAINTENANCE = "maintenance";
165
166
167 private static final int DEFAULT_CACHE_EXPIRE = 60;
168
169
170 private static final String EMPTY_CALENDAR_ETAG = "mod0";
171
172 private static final String SNAPSHOT_OWNER = SchedulerService.JOB_TYPE;
173
174 private static final Gson gson = new Gson();
175
176
177
178
179
180 private static Map<String,String> deserializeExtendedEventProperties(String props) {
181 if (props == null || props.trim().isEmpty()) {
182 return new HashMap<>();
183 }
184 Type type = new TypeToken<Map<String, String>>() { }.getType();
185 return gson.fromJson(props, type);
186 }
187
188
189 protected Cache<String, String> lastModifiedCache = CacheBuilder.newBuilder()
190 .expireAfterWrite(DEFAULT_CACHE_EXPIRE, TimeUnit.SECONDS).build();
191
192
193 private SchedulerServiceDatabase persistence;
194
195
196 private SeriesService seriesService;
197
198
199 private SecurityService securityService;
200
201
202 private AssetManager assetManager;
203
204
205 private Workspace workspace;
206
207
208 private AuthorizationService authorizationService;
209
210
211 private OrganizationDirectoryService orgDirectoryService;
212
213
214 private ElasticsearchIndex index;
215
216
217 private List<EventCatalogUIAdapter> eventCatalogUIAdapters = new ArrayList<>();
218 private final List<SchedulerUpdateHandler> schedulerUpdateHandlers = new CopyOnWriteArrayList<>();
219
220
221 private String systemUserName;
222
223 private ComponentContext componentContext;
224
225
226
227
228
229
230 @Reference(
231 cardinality = ReferenceCardinality.MULTIPLE,
232 policy = ReferencePolicy.DYNAMIC,
233 policyOption = ReferencePolicyOption.GREEDY,
234 unbind = "removeSchedulerUpdateHandler"
235 )
236 public void addSchedulerUpdateHandler(SchedulerUpdateHandler handler) {
237 this.schedulerUpdateHandlers.add(handler);
238 }
239
240 public void removeSchedulerUpdateHandler(SchedulerUpdateHandler handler) {
241 this.schedulerUpdateHandlers.remove(handler);
242 }
243
244
245
246
247
248
249 @Reference
250 public void setPersistence(SchedulerServiceDatabase persistence) {
251 this.persistence = persistence;
252 }
253
254
255
256
257
258
259 @Reference
260 public void setSeriesService(SeriesService seriesService) {
261 this.seriesService = seriesService;
262 }
263
264
265
266
267
268
269 @Reference
270 public void setSecurityService(SecurityService securityService) {
271 this.securityService = securityService;
272 }
273
274
275
276
277
278
279 @Reference
280 public void setAssetManager(AssetManager assetManager) {
281 this.assetManager = assetManager;
282 }
283
284
285
286
287
288
289 @Reference
290 public void setWorkspace(Workspace workspace) {
291 this.workspace = workspace;
292 }
293
294
295
296
297
298
299 @Reference
300 public void setAuthorizationService(AuthorizationService authorizationService) {
301 this.authorizationService = authorizationService;
302 }
303
304
305
306
307
308
309 private void sendSchedulerUpdate(SchedulerItemList list) {
310 while (schedulerUpdateHandlers.size() != 1) {
311 logger.warn("Expecting 1 handler, but {} are registered. Waiting 10s then retrying...",
312 schedulerUpdateHandlers.size());
313 try {
314 Thread.sleep(10000L);
315 } catch (InterruptedException e) { }
316 }
317 String mpid = list.getId();
318 for (SchedulerItem item : list.getItems()) {
319 for (SchedulerUpdateHandler handler : this.schedulerUpdateHandlers) {
320 handler.execute(mpid, item);
321 }
322 }
323 }
324
325
326
327
328
329
330 @Reference
331 public void setOrgDirectoryService(OrganizationDirectoryService orgDirectoryService) {
332 this.orgDirectoryService = orgDirectoryService;
333 }
334
335
336
337
338
339
340
341 @Reference
342 public void setIndex(ElasticsearchIndex index) {
343 this.index = index;
344 }
345
346
347 @Reference(
348 cardinality = ReferenceCardinality.MULTIPLE,
349 policy = ReferencePolicy.DYNAMIC,
350 policyOption = ReferencePolicyOption.GREEDY,
351 unbind = "removeCatalogUIAdapter"
352 )
353 public void addCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
354 eventCatalogUIAdapters.add(catalogUIAdapter);
355 }
356
357
358 public void removeCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
359 eventCatalogUIAdapters.remove(catalogUIAdapter);
360 }
361
362
363
364
365
366
367
368
369 @Activate
370 public void activate(ComponentContext cc) throws Exception {
371 this.componentContext = cc;
372 systemUserName = SecurityUtil.getSystemUserName(cc);
373 logger.info("Activating Scheduler Service");
374 }
375
376 @Override
377 public void updated(Dictionary<String, ?> properties) throws ConfigurationException {
378 if (properties != null) {
379 final Optional<Integer> cacheExpireDuration =
380 OsgiUtil.getOptCfgAsInt(properties, CFG_KEY_LAST_MODIFIED_CACHE_EXPIRE);
381 if (cacheExpireDuration.isPresent()) {
382 lastModifiedCache = CacheBuilder.newBuilder().expireAfterWrite(cacheExpireDuration.get(), TimeUnit.SECONDS)
383 .build();
384 logger.info("Set last modified cache to {}", DateTimeSupport.humanReadableTime(cacheExpireDuration.get()));
385 } else {
386 logger.info("Set last modified cache to default {}", DateTimeSupport.humanReadableTime(DEFAULT_CACHE_EXPIRE));
387 }
388 final Optional<Boolean> maintenance = OsgiUtil.getOptCfgAsBoolean(properties, CFG_KEY_MAINTENANCE);
389 if (maintenance.orElse(false)) {
390 final String name = SchedulerServiceImpl.class.getName();
391 logger.warn("Putting scheduler into maintenance mode. This only makes sense when migrating data. If this is not"
392 + " intended, edit the config file '{}.cfg' accordingly and restart opencast.", name);
393 componentContext.disableComponent(name);
394 }
395 }
396 }
397
398 @Override
399 public void addEvent(Date startDateTime, Date endDateTime, String captureAgentId, Set<String> userIds,
400 MediaPackage mediaPackage, Map<String, String> wfProperties, Map<String, String> caMetadata,
401 Optional<String> schedulingSource)
402 throws UnauthorizedException, SchedulerException {
403 addEventInternal(startDateTime, endDateTime, captureAgentId, userIds, mediaPackage, wfProperties, caMetadata,
404 schedulingSource);
405 }
406
407 private void addEventInternal(Date startDateTime, Date endDateTime, String captureAgentId, Set<String> userIds,
408 MediaPackage mediaPackage, Map<String, String> wfProperties, Map<String, String> caMetadata,
409 Optional<String> schedulingSource)
410 throws SchedulerException {
411 notNull(startDateTime, "startDateTime");
412 notNull(endDateTime, "endDateTime");
413 notEmpty(captureAgentId, "captureAgentId");
414 notNull(userIds, "userIds");
415 notNull(mediaPackage, "mediaPackage");
416 notNull(wfProperties, "wfProperties");
417 notNull(caMetadata, "caMetadata");
418 notNull(schedulingSource, "schedulingSource");
419 if (endDateTime.before(startDateTime)) {
420 throw new IllegalArgumentException("The end date is before the start date");
421 }
422
423 final String mediaPackageId = mediaPackage.getIdentifier().toString();
424
425 try {
426 Optional<MediaPackage> noMediaPackage = assetManager.getMediaPackage(mediaPackageId);
427 if (noMediaPackage.isPresent()) {
428 logger.warn("Mediapackage with id '{}' already exists!", mediaPackageId);
429 throw new SchedulerConflictException("Mediapackage with id '" + mediaPackageId + "' already exists!");
430 }
431
432 Optional<String> seriesId = Optional.ofNullable(StringUtils.trimToNull(mediaPackage.getSeries()));
433
434 List<MediaPackage> conflictingEvents = findConflictingEvents(captureAgentId, startDateTime, endDateTime);
435 if (conflictingEvents.size() > 0) {
436 logger.info("Unable to add event {}, conflicting events found: {}", mediaPackageId, conflictingEvents);
437 throw new SchedulerConflictException(
438 "Unable to add event, conflicting events found for event " + mediaPackageId);
439 }
440
441
442 Optional<DublinCoreCatalog> dublinCore = DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage);
443 AccessControlList acl = authorizationService.getActiveAcl(mediaPackage).getA();
444
445
446 Map<String, String> finalCaProperties = getFinalAgentProperties(caMetadata, wfProperties, captureAgentId,
447 seriesId, dublinCore);
448
449
450 String checksum = calculateChecksum(workspace, getEventCatalogUIAdapterFlavors(), startDateTime, endDateTime,
451 captureAgentId, userIds, mediaPackage, dublinCore, wfProperties,
452 finalCaProperties, acl);
453 persistEvent(mediaPackageId, checksum, Optional.of(startDateTime), Optional.of(endDateTime),
454 Optional.of(captureAgentId), Optional.of(userIds), Optional.of(mediaPackage), Optional.of(wfProperties),
455 Optional.of(finalCaProperties), schedulingSource);
456
457
458 updateLiveEvent(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime),
459 Optional.of(endDateTime), Optional.of(captureAgentId), Optional.of(finalCaProperties));
460
461
462 updateEventInIndex(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime),
463 Optional.of(endDateTime), Optional.of(userIds), Optional.of(captureAgentId), Optional.of(finalCaProperties),
464 Optional.empty());
465
466
467 touchLastEntry(captureAgentId);
468 } catch (SchedulerException e) {
469 throw e;
470 } catch (Exception e) {
471 logger.error("Failed to create event with id '{}':", mediaPackageId, e);
472 throw new SchedulerException(e);
473 }
474 }
475
476 @Override
477 public Map<String, Period> addMultipleEvents(RRule rRule, Date start, Date end, Long duration, TimeZone tz,
478 String captureAgentId, Set<String> userIds, MediaPackage templateMp, Map<String, String> wfProperties,
479 Map<String, String> caMetadata, Optional<String> schedulingSource)
480 throws UnauthorizedException, SchedulerConflictException, SchedulerException {
481
482 Util.adjustRrule(rRule, start, tz);
483 List<Period> periods = Util.calculatePeriods(start, end, duration, rRule, tz);
484 if (periods.isEmpty()) {
485 return Collections.emptyMap();
486 }
487 return addMultipleEventInternal(periods, captureAgentId, userIds, templateMp, wfProperties, caMetadata,
488 schedulingSource);
489 }
490
491 private Map<String, Period> addMultipleEventInternal(List<Period> periods, String captureAgentId,
492 Set<String> userIds, MediaPackage templateMp, Map<String, String> wfProperties,
493 Map<String, String> caMetadata, Optional<String> schedulingSource) throws SchedulerException {
494 notNull(periods, "periods");
495 requireTrue(periods.size() > 0, "periods");
496 notEmpty(captureAgentId, "captureAgentId");
497 notNull(userIds, "userIds");
498 notNull(templateMp, "mediaPackages");
499 notNull(wfProperties, "wfProperties");
500 notNull(caMetadata, "caMetadata");
501 notNull(schedulingSource, "schedulingSource");
502
503 Map<String, Period> scheduledEvents = new ConcurrentHashMap<>();
504
505 try {
506 LinkedList<Id> ids = new LinkedList<>();
507
508 while (ids.size() <= periods.size()) {
509
510 while (ids.size() <= periods.size()) {
511 Id id = new IdImpl(UUID.randomUUID().toString());
512 ids.add(id);
513 }
514
515 List<Snapshot> snapshots = assetManager.getLatestSnapshots(ids);
516
517
518 if (snapshots.size() > 0) {
519 ids.clear();
520 }
521 }
522
523 Optional<String> seriesId = Optional.ofNullable(StringUtils.trimToNull(templateMp.getSeries()));
524
525 List<MediaPackage> conflictingEvents = findConflictingEvents(periods, captureAgentId, TimeZone.getDefault());
526 if (conflictingEvents.size() > 0) {
527 logger.info("Unable to add events, conflicting events found: {}", conflictingEvents);
528 throw new SchedulerConflictException("Unable to add event, conflicting events found");
529 }
530
531 final Organization org = securityService.getOrganization();
532 final User user = securityService.getUser();
533 periods.parallelStream().forEach(event -> SecurityUtil.runAs(securityService, org, user, () -> {
534 final int currentCounter = periods.indexOf(event);
535 MediaPackage mediaPackage = (MediaPackage) templateMp.clone();
536 Date startDate = new Date(event.getStart().getTime());
537 Date endDate = new Date(event.getEnd().getTime());
538 Id id = ids.get(currentCounter);
539
540
541 DublinCoreCatalog dc;
542 Optional<DublinCoreCatalog> dcOpt = DublinCoreUtil.loadEpisodeDublinCore(workspace, templateMp);
543 if (dcOpt.isPresent()) {
544 dc = dcOpt.get();
545 dc = (DublinCoreCatalog) dc.clone();
546
547 dc.addBindings(XmlNamespaceContext
548 .mk(XmlNamespaceBinding.mk(DublinCores.OC_PROPERTY_NS_PREFIX, DublinCores.OC_PROPERTY_NS_URI)));
549 } else {
550 dc = DublinCores.mkOpencastEpisode().getCatalog();
551 }
552
553
554 mediaPackage.setIdentifier(id);
555
556
557 String newTitle = dc.getFirst(DublinCore.PROPERTY_TITLE)
558 + String.format(" %0" + Integer.toString(periods.size()).length() + "d", currentCounter + 1);
559 dc.set(DublinCore.PROPERTY_TITLE, newTitle);
560 DublinCoreValue eventTime = EncodingSchemeUtils.encodePeriod(new DCMIPeriod(startDate, endDate),
561 Precision.Second);
562 dc.set(DublinCore.PROPERTY_TEMPORAL, eventTime);
563 dc.set(DublinCore.PROPERTY_CREATED, EncodingSchemeUtils.encodeDate(startDate, Precision.Second));
564 try {
565 mediaPackage = updateDublincCoreCatalog(mediaPackage, dc);
566 } catch (Exception e) {
567 Misc.chuck(e);
568 }
569 mediaPackage.setTitle(newTitle);
570
571 String mediaPackageId = mediaPackage.getIdentifier().toString();
572
573 Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
574 cal.setTime(event.getStart());
575 Date startDateTime = cal.getTime();
576 cal.setTime(event.getEnd());
577 Date endDateTime = cal.getTime();
578
579 Optional<DublinCoreCatalog> dublinCore = DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage);
580 AccessControlList acl = authorizationService.getActiveAcl(mediaPackage).getA();
581
582
583 Map<String, String> finalCaProperties = getFinalAgentProperties(caMetadata, wfProperties, captureAgentId,
584 seriesId, dublinCore);
585
586
587 String checksum = calculateChecksum(workspace, getEventCatalogUIAdapterFlavors(), startDateTime, endDateTime,
588 captureAgentId, userIds, mediaPackage, dublinCore, wfProperties, finalCaProperties, acl);
589 try {
590 persistEvent(mediaPackageId, checksum, Optional.of(startDateTime), Optional.of(endDateTime),
591 Optional.of(captureAgentId), Optional.of(userIds), Optional.of(mediaPackage), Optional.of(wfProperties),
592 Optional.of(finalCaProperties), schedulingSource);
593 } catch (Exception e) {
594 Misc.chuck(e);
595 }
596
597
598 updateLiveEvent(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime),
599 Optional.of(endDateTime), Optional.of(captureAgentId), Optional.of(finalCaProperties));
600
601
602 updateEventInIndex(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime),
603 Optional.of(endDateTime), Optional.of(userIds), Optional.of(captureAgentId),
604 Optional.of(finalCaProperties), Optional.empty());
605
606 scheduledEvents.put(mediaPackageId, event);
607 for (MediaPackageElement mediaPackageElement : mediaPackage.getElements()) {
608 try {
609 workspace.delete(mediaPackage.getIdentifier().toString(), mediaPackageElement.getIdentifier());
610 } catch (NotFoundException | IOException e) {
611 logger.warn("Failed to delete media package element", e);
612 }
613 }
614 }));
615 return scheduledEvents;
616 } catch (SchedulerException e) {
617 throw e;
618 } catch (Exception e) {
619 throw new SchedulerException(e);
620 } finally {
621
622 if (!scheduledEvents.isEmpty()) {
623 touchLastEntry(captureAgentId);
624 }
625 }
626 }
627
628 @Override
629 public void updateEvent(final String mpId, Optional<Date> startDateTime, Optional<Date> endDateTime,
630 Optional<String> captureAgentId, Optional<Set<String>> userIds, Optional<MediaPackage> mediaPackage,
631 Optional<Map<String, String>> wfProperties, Optional<Map<String, String>> caMetadata)
632 throws NotFoundException, UnauthorizedException, SchedulerException {
633 updateEventInternal(mpId, startDateTime, endDateTime, captureAgentId, userIds, mediaPackage,
634 wfProperties, caMetadata, false);
635 }
636
637 @Override
638 public void updateEvent(final String mpId, Optional<Date> startDateTime, Optional<Date> endDateTime,
639 Optional<String> captureAgentId, Optional<Set<String>> userIds, Optional<MediaPackage> mediaPackage,
640 Optional<Map<String, String>> wfProperties, Optional<Map<String, String>> caMetadata, boolean allowConflict)
641 throws NotFoundException, UnauthorizedException, SchedulerException {
642 updateEventInternal(mpId, startDateTime, endDateTime, captureAgentId, userIds, mediaPackage,
643 wfProperties, caMetadata, allowConflict);
644 }
645
646 private void updateEventInternal(final String mpId, Optional<Date> startDateTime,
647 Optional<Date> endDateTime, Optional<String> captureAgentId, Optional<Set<String>> userIds,
648 Optional<MediaPackage> mediaPackageOpt, Optional<Map<String, String>> wfProperties,
649 Optional<Map<String, String>> caMetadata, boolean allowConflict)
650 throws NotFoundException, SchedulerException {
651 notEmpty(mpId, "mpId");
652 notNull(startDateTime, "startDateTime");
653 notNull(endDateTime, "endDateTime");
654 notNull(captureAgentId, "captureAgentId");
655 notNull(userIds, "userIds");
656 notNull(mediaPackageOpt, "mediaPackageOpt");
657 notNull(wfProperties, "wfProperties");
658 notNull(caMetadata, "caMetadata");
659
660 try {
661 Optional<Snapshot> optSnapshot = assetManager.getLatestSnapshot(mpId);
662 Optional<ExtendedEventDto> optExtEvent = persistence.getEvent(mpId);
663 if (optSnapshot.isEmpty() || optExtEvent.isEmpty()) {
664 throw new NotFoundException("No event found while updating event " + mpId);
665 }
666
667 Snapshot snapshot = optSnapshot.get();
668 MediaPackage archivedMediaPackage = snapshot.getMediaPackage();
669
670 Optional<DublinCoreCatalog> archivedDublinCoreOpt = loadEpisodeDublinCoreFromAsset(snapshot);
671 if (archivedDublinCoreOpt.isEmpty()) {
672 throw new NotFoundException("No dublincore found while updating event " + mpId);
673 }
674 DublinCoreCatalog archivedDublinCore = archivedDublinCoreOpt.get();
675 AccessControlList archivedAcl = authorizationService.getActiveAcl(archivedMediaPackage).getA();
676
677 final ExtendedEventDto extendedEventDto = optExtEvent.get();
678 Date start = extendedEventDto.getStartDate();
679 Date end = extendedEventDto.getEndDate();
680
681 if ((startDateTime.isPresent() || endDateTime.isPresent())
682 && endDateTime.orElse(end).before(startDateTime.orElse(start))) {
683 throw new SchedulerException("The end date is before the start date");
684 }
685
686 String agentId = extendedEventDto.getCaptureAgentId();
687 Optional<String> seriesId = Optional.ofNullable(archivedMediaPackage.getSeries());
688
689
690
691 if ((captureAgentId.isPresent() || startDateTime.isPresent() || endDateTime.isPresent())
692 && (!allowConflict || !isAdmin())) {
693 List<MediaPackage> conflictingEvents = findConflictingEvents(
694 captureAgentId.orElse(agentId),
695 startDateTime.orElse(start),
696 endDateTime.orElse(end)
697 ).stream()
698 .filter(mp -> !mpId.equals(mp.getIdentifier().toString()))
699 .collect(Collectors.toList());
700 if (conflictingEvents.size() > 0) {
701 logger.info("Unable to update event {}, conflicting events found: {}", mpId, conflictingEvents);
702 throw new SchedulerConflictException("Unable to update event, conflicting events found for event " + mpId);
703 }
704 }
705
706 Set<String> presenters = getPresenters(Optional.ofNullable(extendedEventDto.getPresenters()).orElse(""));
707 Map<String, String> wfProps = deserializeExtendedEventProperties(extendedEventDto.getWorkflowProperties());
708 Map<String, String> caProperties = deserializeExtendedEventProperties(
709 extendedEventDto.getCaptureAgentProperties());
710
711 boolean propertiesChanged = false;
712 boolean dublinCoreChanged = false;
713
714
715 if (wfProperties.isPresent()) {
716 propertiesChanged = true;
717 wfProps = wfProperties.get();
718 }
719
720
721 if (caMetadata.isPresent()) {
722 propertiesChanged = true;
723 caProperties = caMetadata.get();
724 }
725
726 if (captureAgentId.isPresent()) {
727 propertiesChanged = true;
728 }
729
730 Optional<AccessControlList> changedAclOpt = Optional.empty();
731 Optional<DublinCoreCatalog> changedDublinCoreOpt = Optional.empty();
732 if (mediaPackageOpt.isPresent()) {
733 MediaPackage mediaPackage = mediaPackageOpt.get();
734
735 if (ne(archivedMediaPackage.getSeries(), mediaPackage.getSeries())) {
736 propertiesChanged = true;
737 seriesId = Optional.ofNullable(mediaPackage.getSeries());
738 }
739
740
741 AccessControlList acl = authorizationService.getActiveAcl(mediaPackage).getA();
742 if (!AccessControlUtil.equals(acl, archivedAcl)) {
743 changedAclOpt = Optional.of(acl);
744 }
745
746
747 Optional<DublinCoreCatalog> dublinCoreOpt = DublinCoreUtil.loadEpisodeDublinCore(workspace,
748 mediaPackage);
749 if (dublinCoreOpt.isPresent() && !DublinCoreUtil.equals(archivedDublinCore, dublinCoreOpt.get())) {
750 dublinCoreChanged = true;
751 propertiesChanged = true;
752 changedDublinCoreOpt = dublinCoreOpt;
753 }
754 }
755
756
757 DublinCoreCatalog dublinCore = changedDublinCoreOpt.orElse(archivedDublinCore);
758 DublinCoreCatalog dublinCoreCopy = (DublinCoreCatalog) dublinCore.clone();
759 if (startDateTime.isPresent() && endDateTime.isPresent()) {
760 DublinCoreValue eventTime = EncodingSchemeUtils.encodePeriod(
761 new DCMIPeriod(startDateTime.get(), endDateTime.get()), Precision.Second);
762 dublinCore.set(DublinCore.PROPERTY_TEMPORAL, eventTime);
763 }
764 if (captureAgentId.isPresent()) {
765 dublinCore.set(DublinCore.PROPERTY_SPATIAL, captureAgentId.get());
766 }
767 if (!DublinCoreUtil.equals(dublinCore, dublinCoreCopy)) {
768 dublinCoreChanged = true;
769 changedDublinCoreOpt = Optional.of(dublinCore);
770 mediaPackageOpt = Optional.of(updateDublincCoreCatalog(mediaPackageOpt.orElse(archivedMediaPackage),
771 changedDublinCoreOpt.get()));
772 }
773
774 Optional<Map<String, String>> finalCaProperties = Optional.empty();
775 if (propertiesChanged) {
776 finalCaProperties = Optional.of(getFinalAgentProperties(caProperties, wfProps, captureAgentId.orElse(agentId),
777 seriesId, Optional.of(changedDublinCoreOpt.orElse(
778 archivedDublinCore))));
779 }
780
781 String checksum = calculateChecksum(workspace, getEventCatalogUIAdapterFlavors(), startDateTime.orElse(start),
782 endDateTime.orElse(end), captureAgentId.orElse(agentId), userIds.orElse(presenters),
783 mediaPackageOpt.orElse(archivedMediaPackage),
784 Optional.of(changedDublinCoreOpt.orElse(archivedDublinCore)), wfProperties.orElse(wfProps),
785 finalCaProperties.orElse(caProperties), changedAclOpt.orElse(new AccessControlList()));
786
787 String oldChecksum = extendedEventDto.getChecksum();
788 if (checksum.equals(oldChecksum)) {
789 logger.debug("Updated event {} has same checksum, ignore update", mpId);
790 return;
791 }
792
793
794 persistEvent(mpId, checksum, startDateTime, endDateTime, captureAgentId, userIds,
795 mediaPackageOpt, wfProperties, finalCaProperties, Optional.empty());
796
797
798 updateLiveEvent(mpId, changedAclOpt, changedDublinCoreOpt, startDateTime, endDateTime, Optional.of(agentId),
799 finalCaProperties);
800
801
802 updateEventInIndex(mpId, changedAclOpt, changedDublinCoreOpt, startDateTime, endDateTime, userIds,
803 Optional.of(agentId), finalCaProperties, Optional.empty());
804
805
806 if (propertiesChanged || dublinCoreChanged || startDateTime.isPresent() || endDateTime.isPresent()) {
807 touchLastEntry(agentId);
808 if (captureAgentId.isPresent()) {
809 touchLastEntry(captureAgentId.get());
810 }
811 }
812 } catch (NotFoundException | SchedulerException e) {
813 throw e;
814 } catch (Exception e) {
815 throw new SchedulerException(e);
816 }
817 }
818
819 private boolean isAdmin() {
820 return (securityService.getUser().hasRole(GLOBAL_ADMIN_ROLE)
821 || securityService.getUser().hasRole(securityService.getOrganization().getAdminRole()));
822 }
823
824 private Optional<DublinCoreCatalog> loadEpisodeDublinCoreFromAsset(Snapshot snapshot) {
825 Optional<MediaPackageElement> dcCatalog = Arrays.stream(snapshot.getMediaPackage().getElements())
826 .filter(MediaPackageSupport.Filters::isEpisodeDublinCore)
827 .findFirst();
828 if (dcCatalog.isEmpty()) {
829 return Optional.empty();
830 }
831
832 Optional<Asset> asset = assetManager.getAsset(snapshot.getVersion(),
833 snapshot.getMediaPackage().getIdentifier().toString(), dcCatalog.get().getIdentifier());
834 if (asset.isEmpty()) {
835 return Optional.empty();
836 }
837
838 if (Availability.OFFLINE.equals(asset.get().getAvailability())) {
839 return Optional.empty();
840 }
841
842 InputStream inputStream = null;
843 try {
844 inputStream = asset.get().getInputStream();
845 return Optional.of(DublinCores.read(inputStream));
846 } finally {
847 IOUtils.closeQuietly(inputStream);
848 }
849 }
850
851 @Override
852 public synchronized void removeEvent(String mediaPackageId)
853 throws NotFoundException, SchedulerException {
854 notEmpty(mediaPackageId, "mediaPackageId");
855
856 boolean notFoundInDatabase = false;
857 boolean notFoundInAssetManager;
858 try {
859
860 try {
861 Optional<ExtendedEventDto> extEvtOpt = persistence.getEvent(mediaPackageId);
862 if (extEvtOpt.isPresent()) {
863 String agentId = extEvtOpt.get().getCaptureAgentId();
864 persistence.deleteEvent(mediaPackageId);
865 if (StringUtils.isNotEmpty(agentId)) {
866 touchLastEntry(agentId);
867 }
868 } else {
869 notFoundInDatabase = true;
870 }
871 } catch (NotFoundException e) {
872 notFoundInDatabase = true;
873 }
874
875
876 long deletedSnapshots = assetManager.deleteSnapshots(mediaPackageId);
877 notFoundInAssetManager = deletedSnapshots == 0;
878
879
880 sendSchedulerUpdate(new SchedulerItemList(mediaPackageId, SchedulerItem.delete()));
881
882
883 removeSchedulingInfoFromIndex(mediaPackageId);
884 } catch (Exception e) {
885 logger.error("Could not remove event '{}' from persistent storage", mediaPackageId, e);
886 throw new SchedulerException(e);
887 }
888
889 if (notFoundInDatabase && notFoundInAssetManager) {
890 throw new NotFoundException();
891 }
892 }
893
894 @Override
895 public MediaPackage getMediaPackage(String mediaPackageId) throws NotFoundException, SchedulerException {
896 notEmpty(mediaPackageId, "mediaPackageId");
897
898 try {
899 return getEventMediaPackage(mediaPackageId);
900 } catch (RuntimeNotFoundException e) {
901 throw e.getWrappedException();
902 } catch (Exception e) {
903 logger.error("Failed to get mediapackage of event '{}':", mediaPackageId, e);
904 throw new SchedulerException(e);
905 }
906 }
907
908 @Override
909 public DublinCoreCatalog getDublinCore(String mediaPackageId) throws NotFoundException, SchedulerException {
910 notEmpty(mediaPackageId, "mediaPackageId");
911
912 try {
913 Optional<Snapshot> optSnapshot = assetManager.getLatestSnapshot(mediaPackageId);
914 if (optSnapshot.isEmpty()) {
915 throw new NotFoundException();
916 }
917
918 Optional<DublinCoreCatalog> dublinCore = loadEpisodeDublinCoreFromAsset(optSnapshot.get());
919 if (dublinCore.isEmpty()) {
920 throw new NotFoundException("No dublincore catalog found " + mediaPackageId);
921 }
922
923 return dublinCore.get();
924 } catch (NotFoundException e) {
925 throw e;
926 } catch (Exception e) {
927 logger.error("Failed to get dublin core catalog of event '{}':", mediaPackageId, e);
928 throw new SchedulerException(e);
929 }
930 }
931
932 @Override
933 public TechnicalMetadata getTechnicalMetadata(String mediaPackageId)
934 throws NotFoundException, UnauthorizedException, SchedulerException {
935 notEmpty(mediaPackageId, "mediaPackageId");
936
937 try {
938 final Optional<ExtendedEventDto> extEvt = persistence.getEvent(mediaPackageId);
939 if (extEvt.isEmpty()) {
940 throw new NotFoundException();
941 }
942
943 return getTechnicalMetadata(extEvt.get());
944 } catch (NotFoundException e) {
945 throw e;
946 } catch (Exception e) {
947 logger.error("Failed to get technical metadata of event '{}':", mediaPackageId, e);
948 throw new SchedulerException(e);
949 }
950 }
951
952 @Override
953 public Map<String, String> getWorkflowConfig(String mediaPackageId) throws NotFoundException, SchedulerException {
954 notEmpty(mediaPackageId, "mediaPackageId");
955
956 try {
957 Optional<ExtendedEventDto> record = persistence.getEvent(mediaPackageId);
958 if (record.isEmpty()) {
959 throw new NotFoundException();
960 }
961 return deserializeExtendedEventProperties(record.get().getWorkflowProperties());
962 } catch (NotFoundException e) {
963 throw e;
964 } catch (Exception e) {
965 logger.error("Failed to get workflow configuration of event '{}':", mediaPackageId, e);
966 throw new SchedulerException(e);
967 }
968 }
969
970 @Override
971 public Map<String, String> getCaptureAgentConfiguration(String mediaPackageId)
972 throws NotFoundException, SchedulerException {
973 notEmpty(mediaPackageId, "mediaPackageId");
974
975 try {
976 Optional<ExtendedEventDto> record = persistence.getEvent(mediaPackageId);
977 if (record.isEmpty()) {
978 throw new NotFoundException();
979 }
980 return deserializeExtendedEventProperties(record.get().getCaptureAgentProperties());
981 } catch (NotFoundException e) {
982 throw e;
983 } catch (Exception e) {
984 logger.error("Failed to get capture agent contiguration of event '{}':", mediaPackageId, e);
985 throw new SchedulerException(e);
986 }
987 }
988
989 @Override
990 public int getEventCount() throws SchedulerException {
991 try {
992 return persistence.countEvents();
993 } catch (Exception e) {
994 throw new SchedulerException(e);
995 }
996 }
997
998 @Override
999 public List<MediaPackage> search(Optional<String> captureAgentId, Optional<Date> startsFrom, Optional<Date> startsTo,
1000 Optional<Date> endFrom, Optional<Date> endTo) throws SchedulerException {
1001 try {
1002 return persistence.search(captureAgentId, startsFrom, startsTo, endFrom, endTo, Optional.empty()).stream()
1003 .map(ExtendedEventDto::getMediaPackageId)
1004 .map(this::getEventMediaPackage).collect(Collectors.toList());
1005 } catch (Exception e) {
1006 throw new SchedulerException(e);
1007 }
1008 }
1009
1010 @Override
1011 public Optional<MediaPackage> getCurrentRecording(String captureAgentId) throws SchedulerException {
1012 try {
1013 final Date now = new Date();
1014 List<ExtendedEventDto> result = persistence.search(Optional.of(captureAgentId), Optional.empty(),
1015 Optional.of(now), Optional.of(now), Optional.empty(), Optional.of(1));
1016 if (result.isEmpty()) {
1017 return Optional.empty();
1018 }
1019 return Optional.of(getEventMediaPackage(result.get(0).getMediaPackageId()));
1020 } catch (Exception e) {
1021 throw new SchedulerException(e);
1022 }
1023 }
1024
1025 @Override
1026 public Optional<MediaPackage> getUpcomingRecording(String captureAgentId) throws SchedulerException {
1027 try {
1028 final Date now = new Date();
1029 List<ExtendedEventDto> result = persistence.search(Optional.of(captureAgentId), Optional.of(now),
1030 Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(1));
1031 if (result.isEmpty()) {
1032 return Optional.empty();
1033 }
1034 return Optional.of(getEventMediaPackage(result.get(0).getMediaPackageId()));
1035 } catch (Exception e) {
1036 throw new SchedulerException(e);
1037 }
1038 }
1039
1040 @Override
1041 public List<MediaPackage> findConflictingEvents(String captureDeviceID, Date startDate, Date endDate)
1042 throws SchedulerException {
1043 try {
1044 final Organization organization = securityService.getOrganization();
1045 final User user = SecurityUtil.createSystemUser(systemUserName, organization);
1046 List<MediaPackage> conflictingEvents = new ArrayList<>();
1047
1048 SecurityUtil.runAs(securityService, organization, user, () -> {
1049 try {
1050 persistence.getEvents(captureDeviceID, startDate, endDate, Util.EVENT_MINIMUM_SEPARATION_MILLISECONDS)
1051 .stream()
1052 .map(id -> getEventMediaPackage(id, false))
1053 .forEach(conflictingEvents::add);
1054 } catch (SchedulerServiceDatabaseException e) {
1055 logger.error("Failed to get conflicting events", e);
1056 }
1057 });
1058
1059 return conflictingEvents;
1060
1061 } catch (Exception e) {
1062 throw new SchedulerException(e);
1063 }
1064 }
1065
1066 @Override
1067 public List<MediaPackage> findConflictingEvents(String captureAgentId, RRule rrule, Date start, Date end,
1068 long duration, TimeZone tz) throws SchedulerException {
1069 notEmpty(captureAgentId, "captureAgentId");
1070 notNull(rrule, "rrule");
1071 notNull(start, "start");
1072 notNull(end, "end");
1073 notNull(tz, "timeZone");
1074
1075 Util.adjustRrule(rrule, start, tz);
1076 final List<Period> periods = Util.calculatePeriods(start, end, duration, rrule, tz);
1077
1078 if (periods.isEmpty()) {
1079 return Collections.emptyList();
1080 }
1081
1082 return findConflictingEvents(periods, captureAgentId, tz);
1083 }
1084
1085 private boolean checkPeriodOverlap(final List<Period> periods) {
1086 final List<Period> sortedPeriods = new ArrayList<>(periods);
1087 sortedPeriods.sort(Comparator.comparing(Period::getStart));
1088 Period prior = periods.get(0);
1089 for (Period current : periods.subList(1, periods.size())) {
1090 if (current.getStart().compareTo(prior.getEnd()) < 0) {
1091 return true;
1092 }
1093 prior = current;
1094 }
1095 return false;
1096 }
1097
1098 private List<MediaPackage> findConflictingEvents(List<Period> periods, String captureAgentId, TimeZone tz)
1099 throws SchedulerException {
1100 notEmpty(captureAgentId, "captureAgentId");
1101 notNull(periods, "periods");
1102 requireTrue(periods.size() > 0, "periods");
1103
1104
1105
1106
1107 if (checkPeriodOverlap(periods)) {
1108 throw new IllegalArgumentException("RRULE periods overlap");
1109 }
1110
1111 try {
1112 TimeZoneRegistry registry = TimeZoneRegistryFactory.getInstance().createRegistry();
1113
1114 Set<MediaPackage> events = new HashSet<>();
1115
1116 for (Period event : periods) {
1117 event.setTimeZone(registry.getTimeZone(tz.getID()));
1118 final Date startDate = event.getStart();
1119 final Date endDate = event.getEnd();
1120
1121 events.addAll(findConflictingEvents(captureAgentId, startDate, endDate));
1122 }
1123
1124 return new ArrayList<>(events);
1125 } catch (Exception e) {
1126 throw new SchedulerException(e);
1127 }
1128 }
1129
1130 @Override
1131 public String getCalendar(Optional<String> captureAgentId, Optional<String> seriesId, Optional<Date> cutoff)
1132 throws SchedulerException {
1133
1134 try {
1135 final Map<String, ExtendedEventDto> searchResult = persistence.search(captureAgentId, Optional.empty(), cutoff,
1136 Optional.of(DateTime.now().minusHours(1).toDate()), Optional.empty(), Optional.empty()).stream()
1137 .collect(Collectors.toMap(ExtendedEventDto::getMediaPackageId, Function.identity()));
1138 var mpIds = searchResult.keySet();
1139 List<Snapshot> snapshots = assetManager.getLatestSnapshots(mpIds);
1140
1141 final CalendarGenerator cal = new CalendarGenerator(seriesService);
1142 for (String mpId : mpIds) {
1143 final Optional<Snapshot> optSnapshot = snapshots.stream()
1144 .filter(mp -> mp.getMediaPackage().getIdentifier().toString().equals(mpId))
1145 .findFirst();
1146
1147
1148 if (optSnapshot.isEmpty()) {
1149 logger.warn("Mediapackage for event '{}' can't be found, event is not recorded", mpId);
1150 continue;
1151 }
1152
1153 Snapshot snapshot = optSnapshot.get();
1154
1155 if (seriesId.isPresent() && !seriesId.get().equals(snapshot.getMediaPackage().getSeries())) {
1156 continue;
1157 }
1158
1159 Optional<DublinCoreCatalog> catalogOpt = loadEpisodeDublinCoreFromAsset(snapshot);
1160 if (catalogOpt.isEmpty()) {
1161 logger.warn("No episode catalog available, skipping!");
1162 continue;
1163 }
1164
1165 final Map<String, String> caMetadata = deserializeExtendedEventProperties(
1166 searchResult.get(mpId).getCaptureAgentProperties());
1167
1168
1169 if (caMetadata.isEmpty()) {
1170 logger.warn("Properties for event '{}' can't be found, event is not recorded", mpId);
1171 continue;
1172 }
1173
1174 final String agentId = searchResult.get(mpId).getCaptureAgentId();
1175 final Date start = searchResult.get(mpId).getStartDate();
1176 final Date end = searchResult.get(mpId).getEndDate();
1177 final Date lastModified = snapshot.getArchivalDate();
1178
1179
1180 try {
1181 cal.addEvent(
1182 snapshot.getMediaPackage(),
1183 catalogOpt.get(),
1184 agentId,
1185 start,
1186 end,
1187 lastModified,
1188 toPropertyString(caMetadata));
1189 } catch (Exception e) {
1190 logger.warn("Error adding event '{}' to calendar, event is not recorded", mpId, e);
1191 }
1192 }
1193
1194
1195 if (cal.getCalendar().getComponents().size() > 0) {
1196 cal.getCalendar().validate();
1197 }
1198
1199 return cal.getCalendar().toString();
1200
1201 } catch (Exception e) {
1202 throw new SchedulerException(e);
1203 }
1204 }
1205
1206 @Override
1207 public String getScheduleLastModified(String captureAgentId) throws SchedulerException {
1208 notEmpty(captureAgentId, "captureAgentId");
1209
1210 try {
1211 String lastModified = lastModifiedCache.getIfPresent(captureAgentId);
1212 if (lastModified != null) {
1213 return lastModified;
1214 }
1215
1216 populateLastModifiedCache();
1217
1218 lastModified = lastModifiedCache.getIfPresent(captureAgentId);
1219
1220
1221 if (lastModified == null) {
1222 lastModified = EMPTY_CALENDAR_ETAG;
1223 lastModifiedCache.put(captureAgentId, lastModified);
1224 }
1225 return lastModified;
1226 } catch (Exception e) {
1227 throw new SchedulerException(e);
1228 }
1229 }
1230
1231 @Override
1232 public void removeScheduledRecordingsBeforeBuffer(long buffer) throws SchedulerException {
1233 DateTime end = new DateTime(DateTimeZone.UTC).minus(buffer * 1000);
1234
1235 logger.info("Starting to look for scheduled recordings that have finished before {}.",
1236 DateTimeSupport.toUTC(end.getMillis()));
1237
1238 List<ExtendedEventDto> finishedEvents;
1239 try {
1240 finishedEvents = persistence.search(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(),
1241 Optional.of(end.toDate()), Optional.empty());
1242 logger.debug("Found {} events from search.", finishedEvents.size());
1243 } catch (Exception e) {
1244 throw new SchedulerException(e);
1245 }
1246
1247 int recordingsRemoved = 0;
1248 for (ExtendedEventDto extEvt : finishedEvents) {
1249 final String eventId = extEvt.getMediaPackageId();
1250 try {
1251 removeEvent(eventId);
1252 logger.debug("Sucessfully removed scheduled event with id " + eventId);
1253 recordingsRemoved++;
1254 } catch (NotFoundException e) {
1255 logger.debug("Skipping event with id {} because it is not found", eventId);
1256 } catch (Exception e) {
1257 logger.warn("Unable to delete event with id '{}':", eventId, e);
1258 }
1259 }
1260
1261 logger.info("Found {} to remove that ended before {}.", recordingsRemoved, DateTimeSupport.toUTC(end.getMillis()));
1262 }
1263
1264 @Override
1265 public boolean updateRecordingState(String id, String state) throws NotFoundException, SchedulerException {
1266 notEmpty(id, "id");
1267 notEmpty(state, "state");
1268
1269 if (!RecordingState.KNOWN_STATES.contains(state)) {
1270 logger.warn("Invalid recording state: {}.", state);
1271 return false;
1272 }
1273
1274 try {
1275 final Optional<ExtendedEventDto> optExtEvt = persistence.getEvent(id);
1276
1277 if (optExtEvt.isEmpty()) {
1278 throw new NotFoundException();
1279 }
1280
1281 final String prevRecordingState = optExtEvt.get().getRecordingState();
1282 final Recording r = new RecordingImpl(id, state);
1283 if (!state.equals(prevRecordingState)) {
1284 logger.debug("Setting Recording {} to state {}.", id, state);
1285
1286
1287 sendSchedulerUpdate(new SchedulerItemList(r.getID(), Collections.singletonList(SchedulerItem
1288 .updateRecordingStatus(r.getState(), r.getLastCheckinTime()))));
1289
1290
1291 updateEventInIndex(r.getID(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(),
1292 Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(r.getState()));
1293 } else {
1294 logger.debug("Recording state not changed");
1295 }
1296
1297 persistence.storeEvent(
1298 id,
1299 securityService.getOrganization().getId(),
1300 Optional.empty(),
1301 Optional.empty(),
1302 Optional.empty(),
1303 Optional.empty(),
1304 Optional.of(r.getState()),
1305 Optional.of(r.getLastCheckinTime()),
1306 Optional.empty(),
1307 Optional.empty(),
1308 Optional.empty(),
1309 Optional.empty(),
1310 Optional.empty()
1311 );
1312 return true;
1313 } catch (NotFoundException e) {
1314 throw e;
1315 } catch (Exception e) {
1316 throw new SchedulerException(e);
1317 }
1318 }
1319
1320 @Override
1321 public Recording getRecordingState(String id) throws NotFoundException, SchedulerException {
1322
1323 notEmpty(id, "id");
1324
1325 try {
1326 Optional<ExtendedEventDto> extEvt = persistence.getEvent(id);
1327
1328 if (extEvt.isEmpty() || extEvt.get().getRecordingState() == null) {
1329 throw new NotFoundException();
1330 }
1331
1332 return new RecordingImpl(id, extEvt.get().getRecordingState(), extEvt.get().getRecordingLastHeard());
1333 } catch (NotFoundException e) {
1334 throw e;
1335 } catch (Exception e) {
1336 throw new SchedulerException(e);
1337 }
1338 }
1339
1340 @Override
1341 public void removeRecording(String id) throws NotFoundException, SchedulerException {
1342 notEmpty(id, "id");
1343
1344 try {
1345 persistence.resetRecordingState(id);
1346
1347
1348 sendSchedulerUpdate(new SchedulerItemList(id, SchedulerItem.deleteRecordingState()));
1349
1350
1351 removeRecordingStatusFromIndex(id);
1352 } catch (NotFoundException e) {
1353 throw e;
1354 } catch (Exception e) {
1355 throw new SchedulerException(e);
1356 }
1357 }
1358
1359 @Override
1360 public Map<String, Recording> getKnownRecordings() throws SchedulerException {
1361 try {
1362 return persistence.getKnownRecordings().parallelStream()
1363 .collect(
1364 Collectors.toMap(ExtendedEventDto::getMediaPackageId,
1365 dto -> new RecordingImpl(dto.getMediaPackageId(), dto.getRecordingState(), dto.getRecordingLastHeard()))
1366 );
1367 } catch (Exception e) {
1368 throw new SchedulerException(e);
1369 }
1370 }
1371
1372 private synchronized void persistEvent(final String mpId, final String checksum,
1373 final Optional<Date> startDateTime, final Optional<Date> endDateTime, final Optional<String> captureAgentId,
1374 final Optional<Set<String>> userIds, final Optional<MediaPackage> mediaPackage,
1375 final Optional<Map<String, String>> wfProperties, final Optional<Map<String, String>> caProperties,
1376 final Optional<String> schedulingSource) throws SchedulerServiceDatabaseException {
1377
1378 if (mediaPackage.isPresent()) {
1379 assetManager.takeSnapshot(SNAPSHOT_OWNER, mediaPackage.get());
1380 }
1381
1382
1383 persistence.storeEvent(
1384 mpId,
1385 securityService.getOrganization().getId(),
1386 captureAgentId,
1387 startDateTime,
1388 endDateTime,
1389 schedulingSource,
1390 Optional.empty(),
1391 Optional.empty(),
1392 userIds.isPresent() ? Optional.of(String.join(",", userIds.get())) : Optional.empty(),
1393 Optional.of(new Date()),
1394 Optional.of(checksum),
1395 wfProperties,
1396 caProperties
1397 );
1398 }
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413 private void updateEventInIndex(String mediaPackageId, Optional<AccessControlList> acl,
1414 Optional<DublinCoreCatalog> dublinCore, Optional<Date> startTime, Optional<Date> endTime,
1415 Optional<Set<String>> presenters, Optional<String> agentId, Optional<Map<String, String>> properties,
1416 Optional<String> recordingStatus) {
1417
1418 String organization = getSecurityService().getOrganization().getId();
1419 User user = getSecurityService().getUser();
1420
1421 Function<Optional<Event>, Optional<Event>> updateFunction = getEventUpdateFunction(mediaPackageId, acl, dublinCore,
1422 startTime, endTime, presenters, agentId, properties, recordingStatus, organization, user);
1423
1424 try {
1425 index.addOrUpdateEvent(mediaPackageId, updateFunction, organization, user);
1426 logger.debug("Scheduled event {} updated in the {} index.", mediaPackageId, index.getIndexName());
1427 } catch (SearchIndexException e) {
1428 logger.error("Error updating the scheduled event {} in the {} index.", mediaPackageId, index.getIndexName(), e);
1429 }
1430 }
1431
1432
1433
1434
1435
1436
1437 private void removeRecordingStatusFromIndex(String mediaPackageId) {
1438 String organization = getSecurityService().getOrganization().getId();
1439 User user = getSecurityService().getUser();
1440
1441 Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
1442 Event event = eventOpt.orElse(new Event(mediaPackageId, organization));
1443 event.setRecordingStatus(null);
1444 return Optional.of(event);
1445 };
1446
1447 try {
1448 index.addOrUpdateEvent(mediaPackageId, updateFunction, organization, user);
1449 logger.debug("Recording state of event {} removed from the {} index.", mediaPackageId, index.getIndexName());
1450 } catch (SearchIndexException e) {
1451 logger.error("Failed to remove the recording state of event {} from the {} index.", mediaPackageId,
1452 index.getIndexName(), e);
1453 }
1454 }
1455
1456
1457
1458
1459
1460
1461 private void removeSchedulingInfoFromIndex(String mediaPackageId) {
1462 String orgId = getSecurityService().getOrganization().getId();
1463
1464 try {
1465 index.deleteEvent(mediaPackageId, orgId);
1466 logger.debug("Scheduling information of event {} removed from the {} index.", mediaPackageId,
1467 index.getIndexName());
1468 } catch (SearchIndexException e) {
1469 logger.error("Failed to delete the scheduling information of event {} from the {} index.", mediaPackageId,
1470 index.getIndexName(), e);
1471 }
1472 }
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485 private void updateLiveEvent(String mpId, Optional<AccessControlList> acl, Optional<DublinCoreCatalog> dublinCore,
1486 Optional<Date> startTime, Optional<Date> endTime, Optional<String> agentId,
1487 Optional<Map<String, String>> properties) {
1488 List<SchedulerItem> items = new ArrayList<>();
1489 if (acl.isPresent()) {
1490 items.add(SchedulerItem.updateAcl(acl.get()));
1491 }
1492 if (dublinCore.isPresent()) {
1493 items.add(SchedulerItem.updateCatalog(dublinCore.get()));
1494 }
1495 if (startTime.isPresent()) {
1496 items.add(SchedulerItem.updateStart(startTime.get()));
1497 }
1498 if (endTime.isPresent()) {
1499 items.add(SchedulerItem.updateEnd(endTime.get()));
1500 }
1501 if (agentId.isPresent()) {
1502 items.add(SchedulerItem.updateAgent(agentId.get()));
1503 }
1504 if (properties.isPresent()) {
1505 items.add(SchedulerItem.updateProperties(properties.get()));
1506 }
1507
1508 if (!items.isEmpty()) {
1509 sendSchedulerUpdate(new SchedulerItemList(mpId, items));
1510 }
1511 }
1512
1513 private Map<String, String> getFinalAgentProperties(Map<String, String> caMetadata, Map<String, String> wfProperties,
1514 String captureAgentId, Optional<String> seriesId, Optional<DublinCoreCatalog> dublinCore) {
1515 Map<String, String> properties = new HashMap<>();
1516 for (Entry<String, String> entry : caMetadata.entrySet()) {
1517 if (entry.getKey().startsWith(WORKFLOW_CONFIG_PREFIX)) {
1518 continue;
1519 }
1520 properties.put(entry.getKey(), entry.getValue());
1521 }
1522 for (Entry<String, String> entry : wfProperties.entrySet()) {
1523 properties.put(WORKFLOW_CONFIG_PREFIX.concat(entry.getKey()), entry.getValue());
1524 }
1525 if (dublinCore.isPresent()) {
1526 properties.put("event.title", dublinCore.get().getFirst(DublinCore.PROPERTY_TITLE));
1527 }
1528 if (seriesId.isPresent()) {
1529 properties.put("event.series", seriesId.get());
1530 }
1531 properties.put("event.location", captureAgentId);
1532 return properties;
1533 }
1534
1535 private void touchLastEntry(String captureAgentId) throws SchedulerException {
1536
1537 try {
1538 logger.debug("Marking calendar feed for {} as modified", captureAgentId);
1539 persistence.touchLastEntry(captureAgentId);
1540 populateLastModifiedCache();
1541 } catch (SchedulerServiceDatabaseException e) {
1542 logger.error("Failed to update last modified entry of agent '{}':", captureAgentId, e);
1543 }
1544 }
1545
1546 private void populateLastModifiedCache() throws SchedulerException {
1547 try {
1548 Map<String, Date> lastModifiedDates = persistence.getLastModifiedDates();
1549 for (Entry<String, Date> entry : lastModifiedDates.entrySet()) {
1550 Date lastModifiedDate = entry.getValue() != null ? entry.getValue() : new Date();
1551 lastModifiedCache.put(entry.getKey(), generateLastModifiedHash(lastModifiedDate));
1552 }
1553 } catch (Exception e) {
1554 throw new SchedulerException(e);
1555 }
1556 }
1557
1558 private String generateLastModifiedHash(Date lastModifiedDate) {
1559 return "mod" + Long.toString(lastModifiedDate.getTime());
1560 }
1561
1562 private String toPropertyString(Map<String, String> properties) {
1563 StringBuilder wfPropertiesString = new StringBuilder();
1564 for (Map.Entry<String, String> entry : properties.entrySet()) {
1565 wfPropertiesString.append(entry.getKey() + "=" + entry.getValue() + "\n");
1566 }
1567 return wfPropertiesString.toString();
1568 }
1569
1570 private MediaPackage getEventMediaPackage(final String mediaPackageId, boolean checkOwner) {
1571 Optional<MediaPackage> mediaPackage = assetManager.getMediaPackage(mediaPackageId);
1572
1573 if (mediaPackage.isEmpty()) {
1574 throw new RuntimeNotFoundException(new NotFoundException());
1575 }
1576
1577 return mediaPackage.get();
1578 }
1579
1580 private MediaPackage getEventMediaPackage(final String mediaPackageId) {
1581 return getEventMediaPackage(mediaPackageId, true);
1582 }
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596 private MediaPackage updateDublincCoreCatalog(MediaPackage mp, DublinCoreCatalog dc)
1597 throws IOException, MediaPackageException {
1598 try (InputStream inputStream = IOUtils.toInputStream(dc.toXmlString(), "UTF-8")) {
1599
1600 Catalog[] catalogs = mp.getCatalogs(MediaPackageElements.EPISODE);
1601 if (catalogs.length > 0) {
1602 Catalog catalog = catalogs[0];
1603 URI uri = workspace.put(mp.getIdentifier().toString(), catalog.getIdentifier(), "dublincore.xml", inputStream);
1604 catalog.setURI(uri);
1605
1606 catalog.setChecksum(null);
1607 } else {
1608 throw new MediaPackageException("Unable to find catalog");
1609 }
1610 }
1611 return mp;
1612 }
1613
1614 private TechnicalMetadata getTechnicalMetadata(ExtendedEventDto extEvt) {
1615 final String agentId = extEvt.getCaptureAgentId();
1616 final Date start = extEvt.getStartDate();
1617 final Date end = extEvt.getEndDate();
1618 final Set<String> presenters = getPresenters(Optional.ofNullable(extEvt.getPresenters()).orElse(""));
1619 final Optional<String> recordingStatus = Optional.ofNullable(extEvt.getRecordingState());
1620 final Optional<Long> lastHeard = Optional.ofNullable(extEvt.getRecordingLastHeard());
1621 final Map<String, String> caMetadata = deserializeExtendedEventProperties(extEvt.getCaptureAgentProperties());
1622 final Map<String, String> wfProperties = deserializeExtendedEventProperties(extEvt.getWorkflowProperties());
1623
1624 Recording recording = null;
1625 if (recordingStatus.isPresent() && lastHeard.isPresent()) {
1626 recording = new RecordingImpl(extEvt.getMediaPackageId(), recordingStatus.get(), lastHeard.get());
1627 }
1628
1629 return new TechnicalMetadataImpl(extEvt.getMediaPackageId(), agentId, start, end, presenters, wfProperties,
1630 caMetadata, Optional.ofNullable(recording));
1631 }
1632
1633 private Set<String> getPresenters(String presentersString) {
1634 return new HashSet<>(Arrays.asList(StringUtils.split(presentersString, ",")));
1635 }
1636
1637
1638
1639
1640 private List<MediaPackageElementFlavor> getEventCatalogUIAdapterFlavors() {
1641 String organization = securityService.getOrganization().getId();
1642 return eventCatalogUIAdapters.stream()
1643 .filter(adapter -> adapter.getOrganization().equals(organization))
1644 .map(EventCatalogUIAdapter::getFlavor)
1645 .filter(mpe -> !MediaPackageElements.EPISODE.matches(mpe))
1646 .collect(Collectors.toList());
1647 }
1648
1649 @Override
1650 public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
1651 try {
1652 final int total;
1653 try {
1654 total = persistence.countEvents();
1655 } catch (SchedulerServiceDatabaseException e) {
1656 logIndexRebuildError(logger, e);
1657 throw new IndexRebuildException(getService(), e);
1658 }
1659 logIndexRebuildBegin(logger, total, "scheduled events");
1660 final int[] current = {0};
1661 int n = 20;
1662 var updatedEventRange = new ArrayList<Event>();
1663
1664 for (Organization organization: orgDirectoryService.getOrganizations()) {
1665 final User user = SecurityUtil.createSystemUser(systemUserName, organization);
1666 SecurityUtil.runAs(securityService, organization, user,
1667 () -> {
1668 final List<ExtendedEventDto> events;
1669 try {
1670 events = persistence.getEvents();
1671 } catch (SchedulerServiceDatabaseException e) {
1672 logIndexRebuildError(logger, e, organization);
1673 return;
1674 }
1675
1676 for (ExtendedEventDto event : events) {
1677 try {
1678 current[0]++;
1679
1680 var updatedEventData = Optional.of(new Event(event.getMediaPackageId(), organization.getId()));
1681
1682 final Set<String> presenters = getPresenters(
1683 Optional.ofNullable(event.getPresenters()).orElse(""));
1684 final Map<String, String> caMetadata = deserializeExtendedEventProperties(
1685 event.getCaptureAgentProperties());
1686
1687 updatedEventData = getEventUpdateFunction(event.getMediaPackageId(), Optional.empty(),
1688 Optional.empty(), Optional.of(event.getStartDate()), Optional.of(event.getEndDate()),
1689 Optional.of(presenters), Optional.of(event.getCaptureAgentId()), Optional.of(caMetadata),
1690 Optional.ofNullable(event.getRecordingState()), organization.getId(),
1691 securityService.getUser()).apply(updatedEventData);
1692 updatedEventRange.add(updatedEventData.get());
1693
1694 if (updatedEventRange.size() >= n || current[0] >= events.size()) {
1695 index.bulkEventUpdate(updatedEventRange);
1696 logIndexRebuildProgress(logger, total, current[0], n);
1697 updatedEventRange.clear();
1698 }
1699
1700 } catch (SearchIndexException e) {
1701 logger.error("Error while updating event '{}' from search index:", event.getMediaPackageId(), e);
1702 } catch (Exception e) {
1703 throw new RuntimeException("Fatal error while indexing event " + event.getMediaPackageId(), e);
1704 }
1705 }
1706 });
1707 }
1708 } catch (Exception e) {
1709 logIndexRebuildError(logger, e);
1710 throw new IndexRebuildException(getService(), e);
1711 }
1712 }
1713
1714 @Override
1715 public IndexRebuildService.Service getService() {
1716 return IndexRebuildService.Service.Scheduler;
1717 }
1718
1719 public SecurityService getSecurityService() {
1720 return securityService;
1721 }
1722
1723
1724
1725
1726
1727
1728
1729
1730 private Function<Optional<Event>, Optional<Event>> getEventUpdateFunction(String mediaPackageId,
1731 Optional<AccessControlList> acl, Optional<DublinCoreCatalog> dublinCore, Optional<Date> startTime,
1732 Optional<Date> endTime, Optional<Set<String>> presenters, Optional<String> agentId,
1733 Optional<Map<String, String>> properties, Optional<String> recordingStatus, String orgId, User user) {
1734 return (Optional<Event> eventOpt) -> {
1735 Event event = eventOpt.orElse(new Event(mediaPackageId, orgId));
1736
1737 if (acl.isPresent()) {
1738 event.setAccessPolicy(AccessControlParser.toJsonSilent(acl.get()));
1739 }
1740 if (dublinCore.isPresent()) {
1741 EventIndexUtils.updateEvent(event, dublinCore.get());
1742 if (isBlank(event.getCreator())) {
1743 event.setCreator(getSecurityService().getUser().getName());
1744 }
1745
1746
1747 try {
1748 EventIndexUtils.updateSeriesName(event, orgId, user, index);
1749 } catch (SearchIndexException e) {
1750 logger.error("Error updating the series name of the event {} in the {} index.", mediaPackageId,
1751 index.getIndexName(), e);
1752 }
1753 }
1754 if (presenters.isPresent()) {
1755 event.setTechnicalPresenters(new ArrayList<>(presenters.get()));
1756 }
1757 if (agentId.isPresent()) {
1758 event.setAgentId(agentId.get());
1759 }
1760 if (recordingStatus.isPresent() && !recordingStatus.get().equals(RecordingState.UNKNOWN)) {
1761 event.setRecordingStatus(recordingStatus.get());
1762 }
1763 if (properties.isPresent()) {
1764 event.setAgentConfiguration(properties.get());
1765 }
1766 if (startTime.isPresent()) {
1767 String startTimeStr = startTime == null ? null : DateTimeSupport.toUTC(startTime.get().getTime());
1768 event.setTechnicalStartTime(startTimeStr);
1769 }
1770 if (endTime.isPresent()) {
1771 String endTimeStr = endTime == null ? null : DateTimeSupport.toUTC(endTime.get().getTime());
1772 event.setTechnicalEndTime(endTimeStr);
1773 }
1774
1775 return Optional.of(event);
1776 };
1777 }
1778 }