1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.scheduler.impl;
22
23 import static org.apache.commons.lang3.StringUtils.isBlank;
24 import static org.opencastproject.scheduler.impl.SchedulerUtil.calculateChecksum;
25 import static org.opencastproject.security.api.SecurityConstants.GLOBAL_ADMIN_ROLE;
26 import static org.opencastproject.util.EqualsUtil.ne;
27 import static org.opencastproject.util.RequireUtil.notEmpty;
28 import static org.opencastproject.util.RequireUtil.notNull;
29 import static org.opencastproject.util.RequireUtil.requireTrue;
30 import static org.opencastproject.util.data.Monadics.mlist;
31
32 import org.opencastproject.assetmanager.api.Asset;
33 import org.opencastproject.assetmanager.api.AssetManager;
34 import org.opencastproject.assetmanager.api.Availability;
35 import org.opencastproject.assetmanager.api.Snapshot;
36 import org.opencastproject.assetmanager.api.query.AQueryBuilder;
37 import org.opencastproject.assetmanager.api.query.ARecord;
38 import org.opencastproject.assetmanager.api.query.AResult;
39 import org.opencastproject.assetmanager.api.query.ASelectQuery;
40 import org.opencastproject.assetmanager.api.query.Predicate;
41 import org.opencastproject.elasticsearch.api.SearchIndexException;
42 import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
43 import org.opencastproject.elasticsearch.index.objects.event.Event;
44 import org.opencastproject.elasticsearch.index.objects.event.EventIndexUtils;
45 import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
46 import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
47 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
48 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
49 import org.opencastproject.mediapackage.Catalog;
50 import org.opencastproject.mediapackage.MediaPackage;
51 import org.opencastproject.mediapackage.MediaPackageElement;
52 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
53 import org.opencastproject.mediapackage.MediaPackageElements;
54 import org.opencastproject.mediapackage.MediaPackageException;
55 import org.opencastproject.mediapackage.MediaPackageSupport;
56 import org.opencastproject.mediapackage.identifier.Id;
57 import org.opencastproject.mediapackage.identifier.IdImpl;
58 import org.opencastproject.message.broker.api.scheduler.SchedulerItem;
59 import org.opencastproject.message.broker.api.scheduler.SchedulerItemList;
60 import org.opencastproject.message.broker.api.update.SchedulerUpdateHandler;
61 import org.opencastproject.metadata.dublincore.DCMIPeriod;
62 import org.opencastproject.metadata.dublincore.DublinCore;
63 import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
64 import org.opencastproject.metadata.dublincore.DublinCoreUtil;
65 import org.opencastproject.metadata.dublincore.DublinCoreValue;
66 import org.opencastproject.metadata.dublincore.DublinCores;
67 import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
68 import org.opencastproject.metadata.dublincore.EventCatalogUIAdapter;
69 import org.opencastproject.metadata.dublincore.Precision;
70 import org.opencastproject.scheduler.api.Recording;
71 import org.opencastproject.scheduler.api.RecordingImpl;
72 import org.opencastproject.scheduler.api.RecordingState;
73 import org.opencastproject.scheduler.api.SchedulerConflictException;
74 import org.opencastproject.scheduler.api.SchedulerException;
75 import org.opencastproject.scheduler.api.SchedulerService;
76 import org.opencastproject.scheduler.api.TechnicalMetadata;
77 import org.opencastproject.scheduler.api.TechnicalMetadataImpl;
78 import org.opencastproject.scheduler.api.Util;
79 import org.opencastproject.scheduler.impl.persistence.ExtendedEventDto;
80 import org.opencastproject.security.api.AccessControlList;
81 import org.opencastproject.security.api.AccessControlParser;
82 import org.opencastproject.security.api.AccessControlUtil;
83 import org.opencastproject.security.api.AuthorizationService;
84 import org.opencastproject.security.api.Organization;
85 import org.opencastproject.security.api.OrganizationDirectoryService;
86 import org.opencastproject.security.api.SecurityService;
87 import org.opencastproject.security.api.UnauthorizedException;
88 import org.opencastproject.security.api.User;
89 import org.opencastproject.security.util.SecurityUtil;
90 import org.opencastproject.series.api.SeriesService;
91 import org.opencastproject.util.DateTimeSupport;
92 import org.opencastproject.util.NotFoundException;
93 import org.opencastproject.util.OsgiUtil;
94 import org.opencastproject.util.XmlNamespaceBinding;
95 import org.opencastproject.util.XmlNamespaceContext;
96 import org.opencastproject.util.data.Option;
97 import org.opencastproject.util.data.functions.Misc;
98 import org.opencastproject.util.data.functions.Strings;
99 import org.opencastproject.workspace.api.Workspace;
100
101 import com.google.common.cache.Cache;
102 import com.google.common.cache.CacheBuilder;
103 import com.google.gson.Gson;
104 import com.google.gson.reflect.TypeToken;
105
106 import net.fortuna.ical4j.model.Period;
107 import net.fortuna.ical4j.model.TimeZoneRegistry;
108 import net.fortuna.ical4j.model.TimeZoneRegistryFactory;
109 import net.fortuna.ical4j.model.property.RRule;
110
111 import org.apache.commons.io.IOUtils;
112 import org.apache.commons.lang3.StringUtils;
113 import org.joda.time.DateTime;
114 import org.joda.time.DateTimeZone;
115 import org.osgi.service.cm.ConfigurationException;
116 import org.osgi.service.cm.ManagedService;
117 import org.osgi.service.component.ComponentContext;
118 import org.osgi.service.component.annotations.Activate;
119 import org.osgi.service.component.annotations.Component;
120 import org.osgi.service.component.annotations.Reference;
121 import org.osgi.service.component.annotations.ReferenceCardinality;
122 import org.osgi.service.component.annotations.ReferencePolicy;
123 import org.osgi.service.component.annotations.ReferencePolicyOption;
124 import org.slf4j.Logger;
125 import org.slf4j.LoggerFactory;
126
127 import java.io.IOException;
128 import java.io.InputStream;
129 import java.lang.reflect.Type;
130 import java.net.URI;
131 import java.util.ArrayList;
132 import java.util.Arrays;
133 import java.util.Calendar;
134 import java.util.Collections;
135 import java.util.Comparator;
136 import java.util.Date;
137 import java.util.Dictionary;
138 import java.util.HashMap;
139 import java.util.HashSet;
140 import java.util.LinkedList;
141 import java.util.List;
142 import java.util.Map;
143 import java.util.Map.Entry;
144 import java.util.Optional;
145 import java.util.Set;
146 import java.util.TimeZone;
147 import java.util.UUID;
148 import java.util.concurrent.ConcurrentHashMap;
149 import java.util.concurrent.CopyOnWriteArrayList;
150 import java.util.concurrent.TimeUnit;
151 import java.util.function.Function;
152 import java.util.stream.Collectors;
153
154
155
156
157 @Component(
158 service = { ManagedService.class, SchedulerService.class, IndexProducer.class },
159 property = {
160 "service.description=Scheduler Service"
161 }
162 )
163 public class SchedulerServiceImpl extends AbstractIndexProducer implements SchedulerService, ManagedService {
164
165
166 private static final Logger logger = LoggerFactory.getLogger(SchedulerServiceImpl.class);
167
168
169 private static final String CFG_KEY_LAST_MODIFIED_CACHE_EXPIRE = "last_modified_cache_expire";
170
171
172 private static final String CFG_KEY_MAINTENANCE = "maintenance";
173
174
175 private static final int DEFAULT_CACHE_EXPIRE = 60;
176
177
178 private static final String EMPTY_CALENDAR_ETAG = "mod0";
179
180 private static final String SNAPSHOT_OWNER = SchedulerService.JOB_TYPE;
181
182 private static final Gson gson = new Gson();
183
184
185
186
187
188 private static Map<String,String> deserializeExtendedEventProperties(String props) {
189 if (props == null || props.trim().isEmpty()) {
190 return new HashMap<>();
191 }
192 Type type = new TypeToken<Map<String, String>>() { }.getType();
193 return gson.fromJson(props, type);
194 }
195
196
197 protected Cache<String, String> lastModifiedCache = CacheBuilder.newBuilder()
198 .expireAfterWrite(DEFAULT_CACHE_EXPIRE, TimeUnit.SECONDS).build();
199
200
201 private SchedulerServiceDatabase persistence;
202
203
204 private SeriesService seriesService;
205
206
207 private SecurityService securityService;
208
209
210 private AssetManager assetManager;
211
212
213 private Workspace workspace;
214
215
216 private AuthorizationService authorizationService;
217
218
219 private OrganizationDirectoryService orgDirectoryService;
220
221
222 private ElasticsearchIndex index;
223
224
225 private List<EventCatalogUIAdapter> eventCatalogUIAdapters = new ArrayList<>();
226 private final List<SchedulerUpdateHandler> schedulerUpdateHandlers = new CopyOnWriteArrayList<>();
227
228
229 private String systemUserName;
230
231 private ComponentContext componentContext;
232
233
234
235
236
237
238 @Reference(
239 cardinality = ReferenceCardinality.MULTIPLE,
240 policy = ReferencePolicy.DYNAMIC,
241 policyOption = ReferencePolicyOption.GREEDY,
242 unbind = "removeSchedulerUpdateHandler"
243 )
244 public void addSchedulerUpdateHandler(SchedulerUpdateHandler handler) {
245 this.schedulerUpdateHandlers.add(handler);
246 }
247
248 public void removeSchedulerUpdateHandler(SchedulerUpdateHandler handler) {
249 this.schedulerUpdateHandlers.remove(handler);
250 }
251
252
253
254
255
256
257 @Reference
258 public void setPersistence(SchedulerServiceDatabase persistence) {
259 this.persistence = persistence;
260 }
261
262
263
264
265
266
267 @Reference
268 public void setSeriesService(SeriesService seriesService) {
269 this.seriesService = seriesService;
270 }
271
272
273
274
275
276
277 @Reference
278 public void setSecurityService(SecurityService securityService) {
279 this.securityService = securityService;
280 }
281
282
283
284
285
286
287 @Reference
288 public void setAssetManager(AssetManager assetManager) {
289 this.assetManager = assetManager;
290 }
291
292
293
294
295
296
297 @Reference
298 public void setWorkspace(Workspace workspace) {
299 this.workspace = workspace;
300 }
301
302
303
304
305
306
307 @Reference
308 public void setAuthorizationService(AuthorizationService authorizationService) {
309 this.authorizationService = authorizationService;
310 }
311
312
313
314
315
316
317 private void sendSchedulerUpdate(SchedulerItemList list) {
318 while (schedulerUpdateHandlers.size() != 1) {
319 logger.warn("Expecting 1 handler, but {} are registered. Waiting 10s then retrying...",
320 schedulerUpdateHandlers.size());
321 try {
322 Thread.sleep(10000L);
323 } catch (InterruptedException e) { }
324 }
325 String mpid = list.getId();
326 for (SchedulerItem item : list.getItems()) {
327 for (SchedulerUpdateHandler handler : this.schedulerUpdateHandlers) {
328 handler.execute(mpid, item);
329 }
330 }
331 }
332
333
334
335
336
337
338 @Reference
339 public void setOrgDirectoryService(OrganizationDirectoryService orgDirectoryService) {
340 this.orgDirectoryService = orgDirectoryService;
341 }
342
343
344
345
346
347
348
349 @Reference
350 public void setIndex(ElasticsearchIndex index) {
351 this.index = index;
352 }
353
354
355 @Reference(
356 cardinality = ReferenceCardinality.MULTIPLE,
357 policy = ReferencePolicy.DYNAMIC,
358 policyOption = ReferencePolicyOption.GREEDY,
359 unbind = "removeCatalogUIAdapter"
360 )
361 public void addCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
362 eventCatalogUIAdapters.add(catalogUIAdapter);
363 }
364
365
366 public void removeCatalogUIAdapter(EventCatalogUIAdapter catalogUIAdapter) {
367 eventCatalogUIAdapters.remove(catalogUIAdapter);
368 }
369
370
371
372
373
374
375
376
377 @Activate
378 public void activate(ComponentContext cc) throws Exception {
379 this.componentContext = cc;
380 systemUserName = SecurityUtil.getSystemUserName(cc);
381 logger.info("Activating Scheduler Service");
382 }
383
384 @Override
385 public void updated(Dictionary<String, ?> properties) throws ConfigurationException {
386 if (properties != null) {
387 final Option<Integer> cacheExpireDuration = OsgiUtil.getOptCfg(properties, CFG_KEY_LAST_MODIFIED_CACHE_EXPIRE)
388 .bind(Strings.toInt);
389 if (cacheExpireDuration.isSome()) {
390 lastModifiedCache = CacheBuilder.newBuilder().expireAfterWrite(cacheExpireDuration.get(), TimeUnit.SECONDS)
391 .build();
392 logger.info("Set last modified cache to {}", DateTimeSupport.humanReadableTime(cacheExpireDuration.get()));
393 } else {
394 logger.info("Set last modified cache to default {}", DateTimeSupport.humanReadableTime(DEFAULT_CACHE_EXPIRE));
395 }
396 final Option<Boolean> maintenance = OsgiUtil.getOptCfgAsBoolean(properties, CFG_KEY_MAINTENANCE);
397 if (maintenance.getOrElse(false)) {
398 final String name = SchedulerServiceImpl.class.getName();
399 logger.warn("Putting scheduler into maintenance mode. This only makes sense when migrating data. If this is not"
400 + " intended, edit the config file '{}.cfg' accordingly and restart opencast.", name);
401 componentContext.disableComponent(name);
402 }
403 }
404 }
405
406 @Override
407 public void addEvent(Date startDateTime, Date endDateTime, String captureAgentId, Set<String> userIds,
408 MediaPackage mediaPackage, Map<String, String> wfProperties, Map<String, String> caMetadata,
409 Optional<String> schedulingSource)
410 throws UnauthorizedException, SchedulerException {
411 addEventInternal(startDateTime, endDateTime, captureAgentId, userIds, mediaPackage, wfProperties, caMetadata,
412 schedulingSource);
413 }
414
415 private void addEventInternal(Date startDateTime, Date endDateTime, String captureAgentId, Set<String> userIds,
416 MediaPackage mediaPackage, Map<String, String> wfProperties, Map<String, String> caMetadata,
417 Optional<String> schedulingSource)
418 throws SchedulerException {
419 notNull(startDateTime, "startDateTime");
420 notNull(endDateTime, "endDateTime");
421 notEmpty(captureAgentId, "captureAgentId");
422 notNull(userIds, "userIds");
423 notNull(mediaPackage, "mediaPackage");
424 notNull(wfProperties, "wfProperties");
425 notNull(caMetadata, "caMetadata");
426 notNull(schedulingSource, "schedulingSource");
427 if (endDateTime.before(startDateTime)) {
428 throw new IllegalArgumentException("The end date is before the start date");
429 }
430
431 final String mediaPackageId = mediaPackage.getIdentifier().toString();
432
433 try {
434 AQueryBuilder query = assetManager.createQuery();
435 AResult result = query.select(query.nothing())
436 .where(withOrganization(query).and(query.mediaPackageId(mediaPackageId).and(query.version().isLatest())))
437 .run();
438 Optional<ARecord> record = result.getRecords().stream().findFirst();
439 if (record.isPresent()) {
440 logger.warn("Mediapackage with id '{}' already exists!", mediaPackageId);
441 throw new SchedulerConflictException("Mediapackage with id '" + mediaPackageId + "' already exists!");
442 }
443
444 Optional<String> seriesId = Optional.ofNullable(StringUtils.trimToNull(mediaPackage.getSeries()));
445
446 List<MediaPackage> conflictingEvents = findConflictingEvents(captureAgentId, startDateTime, endDateTime);
447 if (conflictingEvents.size() > 0) {
448 logger.info("Unable to add event {}, conflicting events found: {}", mediaPackageId, conflictingEvents);
449 throw new SchedulerConflictException(
450 "Unable to add event, conflicting events found for event " + mediaPackageId);
451 }
452
453
454 Optional<DublinCoreCatalog> dublinCore = DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage);
455 AccessControlList acl = authorizationService.getActiveAcl(mediaPackage).getA();
456
457
458 Map<String, String> finalCaProperties = getFinalAgentProperties(caMetadata, wfProperties, captureAgentId,
459 seriesId, dublinCore);
460
461
462 String checksum = calculateChecksum(workspace, getEventCatalogUIAdapterFlavors(), startDateTime, endDateTime,
463 captureAgentId, userIds, mediaPackage, dublinCore, wfProperties,
464 finalCaProperties, acl);
465 persistEvent(mediaPackageId, checksum, Optional.of(startDateTime), Optional.of(endDateTime),
466 Optional.of(captureAgentId), Optional.of(userIds), Optional.of(mediaPackage), Optional.of(wfProperties),
467 Optional.of(finalCaProperties), schedulingSource);
468
469
470 updateLiveEvent(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime),
471 Optional.of(endDateTime), Optional.of(captureAgentId), Optional.of(finalCaProperties));
472
473
474 updateEventInIndex(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime),
475 Optional.of(endDateTime), Optional.of(userIds), Optional.of(captureAgentId), Optional.of(finalCaProperties),
476 Optional.empty());
477
478
479 touchLastEntry(captureAgentId);
480 } catch (SchedulerException e) {
481 throw e;
482 } catch (Exception e) {
483 logger.error("Failed to create event with id '{}':", mediaPackageId, e);
484 throw new SchedulerException(e);
485 }
486 }
487
488 @Override
489 public Map<String, Period> addMultipleEvents(RRule rRule, Date start, Date end, Long duration, TimeZone tz,
490 String captureAgentId, Set<String> userIds, MediaPackage templateMp, Map<String, String> wfProperties,
491 Map<String, String> caMetadata, Optional<String> schedulingSource)
492 throws UnauthorizedException, SchedulerConflictException, SchedulerException {
493
494 Util.adjustRrule(rRule, start, tz);
495 List<Period> periods = Util.calculatePeriods(start, end, duration, rRule, tz);
496 if (periods.isEmpty()) {
497 return Collections.emptyMap();
498 }
499 return addMultipleEventInternal(periods, captureAgentId, userIds, templateMp, wfProperties, caMetadata,
500 schedulingSource);
501 }
502
503 private Map<String, Period> addMultipleEventInternal(List<Period> periods, String captureAgentId,
504 Set<String> userIds, MediaPackage templateMp, Map<String, String> wfProperties,
505 Map<String, String> caMetadata, Optional<String> schedulingSource) throws SchedulerException {
506 notNull(periods, "periods");
507 requireTrue(periods.size() > 0, "periods");
508 notEmpty(captureAgentId, "captureAgentId");
509 notNull(userIds, "userIds");
510 notNull(templateMp, "mediaPackages");
511 notNull(wfProperties, "wfProperties");
512 notNull(caMetadata, "caMetadata");
513 notNull(schedulingSource, "schedulingSource");
514
515 Map<String, Period> scheduledEvents = new ConcurrentHashMap<>();
516
517 try {
518 LinkedList<Id> ids = new LinkedList<>();
519 AQueryBuilder qb = assetManager.createQuery();
520 Predicate p = null;
521
522 while (ids.size() <= periods.size()) {
523
524 while (ids.size() <= periods.size()) {
525 Id id = new IdImpl(UUID.randomUUID().toString());
526 ids.add(id);
527 Predicate np = qb.mediaPackageId(id.toString());
528
529 if (null == p) {
530 p = np;
531 } else {
532 p = p.or(np);
533 }
534 }
535
536 AResult result = qb.select(qb.nothing()).where(withOrganization(qb).and(p).and(qb.version().isLatest())).run();
537
538 if (result.getTotalSize() > 0) {
539 ids.clear();
540 }
541 }
542
543 Optional<String> seriesId = Optional.ofNullable(StringUtils.trimToNull(templateMp.getSeries()));
544
545 List<MediaPackage> conflictingEvents = findConflictingEvents(periods, captureAgentId, TimeZone.getDefault());
546 if (conflictingEvents.size() > 0) {
547 logger.info("Unable to add events, conflicting events found: {}", conflictingEvents);
548 throw new SchedulerConflictException("Unable to add event, conflicting events found");
549 }
550
551 final Organization org = securityService.getOrganization();
552 final User user = securityService.getUser();
553 periods.parallelStream().forEach(event -> SecurityUtil.runAs(securityService, org, user, () -> {
554 final int currentCounter = periods.indexOf(event);
555 MediaPackage mediaPackage = (MediaPackage) templateMp.clone();
556 Date startDate = new Date(event.getStart().getTime());
557 Date endDate = new Date(event.getEnd().getTime());
558 Id id = ids.get(currentCounter);
559
560
561 DublinCoreCatalog dc;
562 Optional<DublinCoreCatalog> dcOpt = DublinCoreUtil.loadEpisodeDublinCore(workspace, templateMp);
563 if (dcOpt.isPresent()) {
564 dc = dcOpt.get();
565 dc = (DublinCoreCatalog) dc.clone();
566
567 dc.addBindings(XmlNamespaceContext
568 .mk(XmlNamespaceBinding.mk(DublinCores.OC_PROPERTY_NS_PREFIX, DublinCores.OC_PROPERTY_NS_URI)));
569 } else {
570 dc = DublinCores.mkOpencastEpisode().getCatalog();
571 }
572
573
574 mediaPackage.setIdentifier(id);
575
576
577 String newTitle = dc.getFirst(DublinCore.PROPERTY_TITLE)
578 + String.format(" %0" + Integer.toString(periods.size()).length() + "d", currentCounter + 1);
579 dc.set(DublinCore.PROPERTY_TITLE, newTitle);
580 DublinCoreValue eventTime = EncodingSchemeUtils.encodePeriod(new DCMIPeriod(startDate, endDate),
581 Precision.Second);
582 dc.set(DublinCore.PROPERTY_TEMPORAL, eventTime);
583 dc.set(DublinCore.PROPERTY_CREATED, EncodingSchemeUtils.encodeDate(startDate, Precision.Second));
584 try {
585 mediaPackage = updateDublincCoreCatalog(mediaPackage, dc);
586 } catch (Exception e) {
587 Misc.chuck(e);
588 }
589 mediaPackage.setTitle(newTitle);
590
591 String mediaPackageId = mediaPackage.getIdentifier().toString();
592
593 Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
594 cal.setTime(event.getStart());
595 Date startDateTime = cal.getTime();
596 cal.setTime(event.getEnd());
597 Date endDateTime = cal.getTime();
598
599 Optional<DublinCoreCatalog> dublinCore = DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage);
600 AccessControlList acl = authorizationService.getActiveAcl(mediaPackage).getA();
601
602
603 Map<String, String> finalCaProperties = getFinalAgentProperties(caMetadata, wfProperties, captureAgentId,
604 seriesId, dublinCore);
605
606
607 String checksum = calculateChecksum(workspace, getEventCatalogUIAdapterFlavors(), startDateTime, endDateTime,
608 captureAgentId, userIds, mediaPackage, dublinCore, wfProperties, finalCaProperties, acl);
609 try {
610 persistEvent(mediaPackageId, checksum, Optional.of(startDateTime), Optional.of(endDateTime),
611 Optional.of(captureAgentId), Optional.of(userIds), Optional.of(mediaPackage), Optional.of(wfProperties),
612 Optional.of(finalCaProperties), schedulingSource);
613 } catch (Exception e) {
614 Misc.chuck(e);
615 }
616
617
618 updateLiveEvent(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime),
619 Optional.of(endDateTime), Optional.of(captureAgentId), Optional.of(finalCaProperties));
620
621
622 updateEventInIndex(mediaPackageId, Optional.of(acl), dublinCore, Optional.of(startDateTime),
623 Optional.of(endDateTime), Optional.of(userIds), Optional.of(captureAgentId),
624 Optional.of(finalCaProperties), Optional.empty());
625
626 scheduledEvents.put(mediaPackageId, event);
627 for (MediaPackageElement mediaPackageElement : mediaPackage.getElements()) {
628 try {
629 workspace.delete(mediaPackage.getIdentifier().toString(), mediaPackageElement.getIdentifier());
630 } catch (NotFoundException | IOException e) {
631 logger.warn("Failed to delete media package element", e);
632 }
633 }
634 }));
635 return scheduledEvents;
636 } catch (SchedulerException e) {
637 throw e;
638 } catch (Exception e) {
639 throw new SchedulerException(e);
640 } finally {
641
642 if (!scheduledEvents.isEmpty()) {
643 touchLastEntry(captureAgentId);
644 }
645 }
646 }
647
648 @Override
649 public void updateEvent(final String mpId, Optional<Date> startDateTime, Optional<Date> endDateTime,
650 Optional<String> captureAgentId, Optional<Set<String>> userIds, Optional<MediaPackage> mediaPackage,
651 Optional<Map<String, String>> wfProperties, Optional<Map<String, String>> caMetadata)
652 throws NotFoundException, UnauthorizedException, SchedulerException {
653 updateEventInternal(mpId, startDateTime, endDateTime, captureAgentId, userIds, mediaPackage,
654 wfProperties, caMetadata, false);
655 }
656
657 @Override
658 public void updateEvent(final String mpId, Optional<Date> startDateTime, Optional<Date> endDateTime,
659 Optional<String> captureAgentId, Optional<Set<String>> userIds, Optional<MediaPackage> mediaPackage,
660 Optional<Map<String, String>> wfProperties, Optional<Map<String, String>> caMetadata, boolean allowConflict)
661 throws NotFoundException, UnauthorizedException, SchedulerException {
662 updateEventInternal(mpId, startDateTime, endDateTime, captureAgentId, userIds, mediaPackage,
663 wfProperties, caMetadata, allowConflict);
664 }
665
666 private void updateEventInternal(final String mpId, Optional<Date> startDateTime,
667 Optional<Date> endDateTime, Optional<String> captureAgentId, Optional<Set<String>> userIds,
668 Optional<MediaPackage> mediaPackageOpt, Optional<Map<String, String>> wfProperties,
669 Optional<Map<String, String>> caMetadata, boolean allowConflict)
670 throws NotFoundException, SchedulerException {
671 notEmpty(mpId, "mpId");
672 notNull(startDateTime, "startDateTime");
673 notNull(endDateTime, "endDateTime");
674 notNull(captureAgentId, "captureAgentId");
675 notNull(userIds, "userIds");
676 notNull(mediaPackageOpt, "mediaPackageOpt");
677 notNull(wfProperties, "wfProperties");
678 notNull(caMetadata, "caMetadata");
679
680 try {
681 AQueryBuilder query = assetManager.createQuery();
682
683 ASelectQuery select = query
684 .select(query.snapshot())
685 .where(withOrganization(query).and(query.mediaPackageId(mpId).and(query.version().isLatest())
686 .and(withOwner(query))));
687 Optional<ARecord> optEvent = select.run().getRecords().stream().findFirst();
688 Optional<ExtendedEventDto> optExtEvent = persistence.getEvent(mpId);
689 if (optEvent.isEmpty() || optExtEvent.isEmpty()) {
690 throw new NotFoundException("No event found while updating event " + mpId);
691 }
692
693 ARecord record = optEvent.get();
694 if (record.getSnapshot().isEmpty()) {
695 throw new NotFoundException("No mediapackage found while updating event " + mpId);
696 }
697 Snapshot snapshot = record.getSnapshot().get();
698 MediaPackage archivedMediaPackage = snapshot.getMediaPackage();
699
700 Optional<DublinCoreCatalog> archivedDublinCoreOpt = loadEpisodeDublinCoreFromAsset(snapshot);
701 if (archivedDublinCoreOpt.isEmpty()) {
702 throw new NotFoundException("No dublincore found while updating event " + mpId);
703 }
704 DublinCoreCatalog archivedDublinCore = archivedDublinCoreOpt.get();
705 AccessControlList archivedAcl = authorizationService.getActiveAcl(archivedMediaPackage).getA();
706
707 final ExtendedEventDto extendedEventDto = optExtEvent.get();
708 Date start = extendedEventDto.getStartDate();
709 Date end = extendedEventDto.getEndDate();
710
711 if ((startDateTime.isPresent() || endDateTime.isPresent())
712 && endDateTime.orElse(end).before(startDateTime.orElse(start))) {
713 throw new SchedulerException("The end date is before the start date");
714 }
715
716 String agentId = extendedEventDto.getCaptureAgentId();
717 Optional<String> seriesId = Optional.ofNullable(archivedMediaPackage.getSeries());
718
719
720
721 if ((captureAgentId.isPresent() || startDateTime.isPresent() || endDateTime.isPresent())
722 && (!allowConflict || !isAdmin())) {
723 List<MediaPackage> conflictingEvents = findConflictingEvents(
724 captureAgentId.orElse(agentId),
725 startDateTime.orElse(start),
726 endDateTime.orElse(end)
727 ).stream()
728 .filter(mp -> !mpId.equals(mp.getIdentifier().toString()))
729 .collect(Collectors.toList());
730 if (conflictingEvents.size() > 0) {
731 logger.info("Unable to update event {}, conflicting events found: {}", mpId, conflictingEvents);
732 throw new SchedulerConflictException("Unable to update event, conflicting events found for event " + mpId);
733 }
734 }
735
736 Set<String> presenters = getPresenters(Optional.ofNullable(extendedEventDto.getPresenters()).orElse(""));
737 Map<String, String> wfProps = deserializeExtendedEventProperties(extendedEventDto.getWorkflowProperties());
738 Map<String, String> caProperties = deserializeExtendedEventProperties(
739 extendedEventDto.getCaptureAgentProperties());
740
741 boolean propertiesChanged = false;
742 boolean dublinCoreChanged = false;
743
744
745 if (wfProperties.isPresent()) {
746 propertiesChanged = true;
747 wfProps = wfProperties.get();
748 }
749
750
751 if (caMetadata.isPresent()) {
752 propertiesChanged = true;
753 caProperties = caMetadata.get();
754 }
755
756 if (captureAgentId.isPresent()) {
757 propertiesChanged = true;
758 }
759
760 Optional<AccessControlList> changedAclOpt = Optional.empty();
761 Optional<DublinCoreCatalog> changedDublinCoreOpt = Optional.empty();
762 if (mediaPackageOpt.isPresent()) {
763 MediaPackage mediaPackage = mediaPackageOpt.get();
764
765 if (ne(archivedMediaPackage.getSeries(), mediaPackage.getSeries())) {
766 propertiesChanged = true;
767 seriesId = Optional.ofNullable(mediaPackage.getSeries());
768 }
769
770
771 AccessControlList acl = authorizationService.getActiveAcl(mediaPackage).getA();
772 if (!AccessControlUtil.equals(acl, archivedAcl)) {
773 changedAclOpt = Optional.of(acl);
774 }
775
776
777 Optional<DublinCoreCatalog> dublinCoreOpt = DublinCoreUtil.loadEpisodeDublinCore(workspace,
778 mediaPackage);
779 if (dublinCoreOpt.isPresent() && !DublinCoreUtil.equals(archivedDublinCore, dublinCoreOpt.get())) {
780 dublinCoreChanged = true;
781 propertiesChanged = true;
782 changedDublinCoreOpt = dublinCoreOpt;
783 }
784 }
785
786
787 DublinCoreCatalog dublinCore = changedDublinCoreOpt.orElse(archivedDublinCore);
788 DublinCoreCatalog dublinCoreCopy = (DublinCoreCatalog) dublinCore.clone();
789 if (startDateTime.isPresent() && endDateTime.isPresent()) {
790 DublinCoreValue eventTime = EncodingSchemeUtils.encodePeriod(
791 new DCMIPeriod(startDateTime.get(), endDateTime.get()), Precision.Second);
792 dublinCore.set(DublinCore.PROPERTY_TEMPORAL, eventTime);
793 }
794 if (captureAgentId.isPresent()) {
795 dublinCore.set(DublinCore.PROPERTY_SPATIAL, captureAgentId.get());
796 }
797 if (!DublinCoreUtil.equals(dublinCore, dublinCoreCopy)) {
798 dublinCoreChanged = true;
799 changedDublinCoreOpt = Optional.of(dublinCore);
800 mediaPackageOpt = Optional.of(updateDublincCoreCatalog(mediaPackageOpt.orElse(archivedMediaPackage),
801 changedDublinCoreOpt.get()));
802 }
803
804 Optional<Map<String, String>> finalCaProperties = Optional.empty();
805 if (propertiesChanged) {
806 finalCaProperties = Optional.of(getFinalAgentProperties(caProperties, wfProps, captureAgentId.orElse(agentId),
807 seriesId, Optional.of(changedDublinCoreOpt.orElse(
808 archivedDublinCore))));
809 }
810
811 String checksum = calculateChecksum(workspace, getEventCatalogUIAdapterFlavors(), startDateTime.orElse(start),
812 endDateTime.orElse(end), captureAgentId.orElse(agentId), userIds.orElse(presenters),
813 mediaPackageOpt.orElse(archivedMediaPackage),
814 Optional.of(changedDublinCoreOpt.orElse(archivedDublinCore)), wfProperties.orElse(wfProps),
815 finalCaProperties.orElse(caProperties), changedAclOpt.orElse(new AccessControlList()));
816
817 String oldChecksum = extendedEventDto.getChecksum();
818 if (checksum.equals(oldChecksum)) {
819 logger.debug("Updated event {} has same checksum, ignore update", mpId);
820 return;
821 }
822
823
824 persistEvent(mpId, checksum, startDateTime, endDateTime, captureAgentId, userIds,
825 mediaPackageOpt, wfProperties, finalCaProperties, Optional.empty());
826
827
828 updateLiveEvent(mpId, changedAclOpt, changedDublinCoreOpt, startDateTime, endDateTime, Optional.of(agentId),
829 finalCaProperties);
830
831
832 updateEventInIndex(mpId, changedAclOpt, changedDublinCoreOpt, startDateTime, endDateTime, userIds,
833 Optional.of(agentId), finalCaProperties, Optional.empty());
834
835
836 if (propertiesChanged || dublinCoreChanged || startDateTime.isPresent() || endDateTime.isPresent()) {
837 touchLastEntry(agentId);
838 if (captureAgentId.isPresent()) {
839 touchLastEntry(captureAgentId.get());
840 }
841 }
842 } catch (NotFoundException | SchedulerException e) {
843 throw e;
844 } catch (Exception e) {
845 throw new SchedulerException(e);
846 }
847 }
848
849 private boolean isAdmin() {
850 return (securityService.getUser().hasRole(GLOBAL_ADMIN_ROLE)
851 || securityService.getUser().hasRole(securityService.getOrganization().getAdminRole()));
852 }
853
854 private Optional<DublinCoreCatalog> loadEpisodeDublinCoreFromAsset(Snapshot snapshot) {
855 Option<MediaPackageElement> dcCatalog = mlist(snapshot.getMediaPackage().getElements())
856 .filter(MediaPackageSupport.Filters.isEpisodeDublinCore).headOpt();
857 if (dcCatalog.isNone()) {
858 return Optional.empty();
859 }
860
861 Optional<Asset> asset = assetManager.getAsset(snapshot.getVersion(),
862 snapshot.getMediaPackage().getIdentifier().toString(), dcCatalog.get().getIdentifier());
863 if (asset.isEmpty()) {
864 return Optional.empty();
865 }
866
867 if (Availability.OFFLINE.equals(asset.get().getAvailability())) {
868 return Optional.empty();
869 }
870
871 InputStream inputStream = null;
872 try {
873 inputStream = asset.get().getInputStream();
874 return Optional.of(DublinCores.read(inputStream));
875 } finally {
876 IOUtils.closeQuietly(inputStream);
877 }
878 }
879
880 @Override
881 public synchronized void removeEvent(String mediaPackageId)
882 throws NotFoundException, SchedulerException {
883 notEmpty(mediaPackageId, "mediaPackageId");
884
885 boolean notFoundInDatabase = false;
886 boolean notFoundInAssetManager;
887 try {
888
889 try {
890 Optional<ExtendedEventDto> extEvtOpt = persistence.getEvent(mediaPackageId);
891 if (extEvtOpt.isPresent()) {
892 String agentId = extEvtOpt.get().getCaptureAgentId();
893 persistence.deleteEvent(mediaPackageId);
894 if (StringUtils.isNotEmpty(agentId)) {
895 touchLastEntry(agentId);
896 }
897 } else {
898 notFoundInDatabase = true;
899 }
900 } catch (NotFoundException e) {
901 notFoundInDatabase = true;
902 }
903
904
905 AQueryBuilder query = assetManager.createQuery();
906 long deletedSnapshots = query.delete(SNAPSHOT_OWNER, query.snapshot())
907 .where(withOrganization(query).and(query.mediaPackageId(mediaPackageId)))
908 .name("delete episode").run();
909 notFoundInAssetManager = deletedSnapshots == 0;
910
911
912 sendSchedulerUpdate(new SchedulerItemList(mediaPackageId, SchedulerItem.delete()));
913
914
915 removeSchedulingInfoFromIndex(mediaPackageId);
916 } catch (Exception e) {
917 logger.error("Could not remove event '{}' from persistent storage", mediaPackageId, e);
918 throw new SchedulerException(e);
919 }
920
921 if (notFoundInDatabase && notFoundInAssetManager) {
922 throw new NotFoundException();
923 }
924 }
925
926 @Override
927 public MediaPackage getMediaPackage(String mediaPackageId) throws NotFoundException, SchedulerException {
928 notEmpty(mediaPackageId, "mediaPackageId");
929
930 try {
931 return getEventMediaPackage(mediaPackageId);
932 } catch (RuntimeNotFoundException e) {
933 throw e.getWrappedException();
934 } catch (Exception e) {
935 logger.error("Failed to get mediapackage of event '{}':", mediaPackageId, e);
936 throw new SchedulerException(e);
937 }
938 }
939
940 @Override
941 public DublinCoreCatalog getDublinCore(String mediaPackageId) throws NotFoundException, SchedulerException {
942 notEmpty(mediaPackageId, "mediaPackageId");
943
944 try {
945 AQueryBuilder query = assetManager.createQuery();
946 AResult result = query.select(query.snapshot())
947 .where(withOrganization(query).and(query.mediaPackageId(mediaPackageId)).and(withOwner(query))
948 .and(query.version().isLatest()))
949 .run();
950 Optional<ARecord> record = result.getRecords().stream().findFirst();
951 if (record.isEmpty()) {
952 throw new NotFoundException();
953 }
954
955 Optional<DublinCoreCatalog> dublinCore = loadEpisodeDublinCoreFromAsset(record.get().getSnapshot().get());
956 if (dublinCore.isEmpty()) {
957 throw new NotFoundException("No dublincore catalog found " + mediaPackageId);
958 }
959
960 return dublinCore.get();
961 } catch (NotFoundException e) {
962 throw e;
963 } catch (Exception e) {
964 logger.error("Failed to get dublin core catalog of event '{}':", mediaPackageId, e);
965 throw new SchedulerException(e);
966 }
967 }
968
969 @Override
970 public TechnicalMetadata getTechnicalMetadata(String mediaPackageId)
971 throws NotFoundException, UnauthorizedException, SchedulerException {
972 notEmpty(mediaPackageId, "mediaPackageId");
973
974 try {
975 final Optional<ExtendedEventDto> extEvt = persistence.getEvent(mediaPackageId);
976 if (extEvt.isEmpty()) {
977 throw new NotFoundException();
978 }
979
980 return getTechnicalMetadata(extEvt.get());
981 } catch (NotFoundException e) {
982 throw e;
983 } catch (Exception e) {
984 logger.error("Failed to get technical metadata of event '{}':", mediaPackageId, e);
985 throw new SchedulerException(e);
986 }
987 }
988
989 @Override
990 public Map<String, String> getWorkflowConfig(String mediaPackageId) throws NotFoundException, SchedulerException {
991 notEmpty(mediaPackageId, "mediaPackageId");
992
993 try {
994 Optional<ExtendedEventDto> record = persistence.getEvent(mediaPackageId);
995 if (record.isEmpty()) {
996 throw new NotFoundException();
997 }
998 return deserializeExtendedEventProperties(record.get().getWorkflowProperties());
999 } catch (NotFoundException e) {
1000 throw e;
1001 } catch (Exception e) {
1002 logger.error("Failed to get workflow configuration of event '{}':", mediaPackageId, e);
1003 throw new SchedulerException(e);
1004 }
1005 }
1006
1007 @Override
1008 public Map<String, String> getCaptureAgentConfiguration(String mediaPackageId)
1009 throws NotFoundException, SchedulerException {
1010 notEmpty(mediaPackageId, "mediaPackageId");
1011
1012 try {
1013 Optional<ExtendedEventDto> record = persistence.getEvent(mediaPackageId);
1014 if (record.isEmpty()) {
1015 throw new NotFoundException();
1016 }
1017 return deserializeExtendedEventProperties(record.get().getCaptureAgentProperties());
1018 } catch (NotFoundException e) {
1019 throw e;
1020 } catch (Exception e) {
1021 logger.error("Failed to get capture agent contiguration of event '{}':", mediaPackageId, e);
1022 throw new SchedulerException(e);
1023 }
1024 }
1025
1026 @Override
1027 public int getEventCount() throws SchedulerException {
1028 try {
1029 return persistence.countEvents();
1030 } catch (Exception e) {
1031 throw new SchedulerException(e);
1032 }
1033 }
1034
1035 @Override
1036 public List<MediaPackage> search(Optional<String> captureAgentId, Optional<Date> startsFrom, Optional<Date> startsTo,
1037 Optional<Date> endFrom, Optional<Date> endTo) throws SchedulerException {
1038 try {
1039 return persistence.search(captureAgentId, startsFrom, startsTo, endFrom, endTo, Optional.empty()).stream()
1040 .map(ExtendedEventDto::getMediaPackageId)
1041 .map(this::getEventMediaPackage).collect(Collectors.toList());
1042 } catch (Exception e) {
1043 throw new SchedulerException(e);
1044 }
1045 }
1046
1047 @Override
1048 public Optional<MediaPackage> getCurrentRecording(String captureAgentId) throws SchedulerException {
1049 try {
1050 final Date now = new Date();
1051 List<ExtendedEventDto> result = persistence.search(Optional.of(captureAgentId), Optional.empty(),
1052 Optional.of(now), Optional.of(now), Optional.empty(), Optional.of(1));
1053 if (result.isEmpty()) {
1054 return Optional.empty();
1055 }
1056 return Optional.of(getEventMediaPackage(result.get(0).getMediaPackageId()));
1057 } catch (Exception e) {
1058 throw new SchedulerException(e);
1059 }
1060 }
1061
1062 @Override
1063 public Optional<MediaPackage> getUpcomingRecording(String captureAgentId) throws SchedulerException {
1064 try {
1065 final Date now = new Date();
1066 List<ExtendedEventDto> result = persistence.search(Optional.of(captureAgentId), Optional.of(now),
1067 Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(1));
1068 if (result.isEmpty()) {
1069 return Optional.empty();
1070 }
1071 return Optional.of(getEventMediaPackage(result.get(0).getMediaPackageId()));
1072 } catch (Exception e) {
1073 throw new SchedulerException(e);
1074 }
1075 }
1076
1077 @Override
1078 public List<MediaPackage> findConflictingEvents(String captureDeviceID, Date startDate, Date endDate)
1079 throws SchedulerException {
1080 try {
1081 final Organization organization = securityService.getOrganization();
1082 final User user = SecurityUtil.createSystemUser(systemUserName, organization);
1083 List<MediaPackage> conflictingEvents = new ArrayList<>();
1084
1085 SecurityUtil.runAs(securityService, organization, user, () -> {
1086 try {
1087 persistence.getEvents(captureDeviceID, startDate, endDate, Util.EVENT_MINIMUM_SEPARATION_MILLISECONDS)
1088 .stream()
1089 .map(id -> getEventMediaPackage(id, false))
1090 .forEach(conflictingEvents::add);
1091 } catch (SchedulerServiceDatabaseException e) {
1092 logger.error("Failed to get conflicting events", e);
1093 }
1094 });
1095
1096 return conflictingEvents;
1097
1098 } catch (Exception e) {
1099 throw new SchedulerException(e);
1100 }
1101 }
1102
1103 @Override
1104 public List<MediaPackage> findConflictingEvents(String captureAgentId, RRule rrule, Date start, Date end,
1105 long duration, TimeZone tz) throws SchedulerException {
1106 notEmpty(captureAgentId, "captureAgentId");
1107 notNull(rrule, "rrule");
1108 notNull(start, "start");
1109 notNull(end, "end");
1110 notNull(tz, "timeZone");
1111
1112 Util.adjustRrule(rrule, start, tz);
1113 final List<Period> periods = Util.calculatePeriods(start, end, duration, rrule, tz);
1114
1115 if (periods.isEmpty()) {
1116 return Collections.emptyList();
1117 }
1118
1119 return findConflictingEvents(periods, captureAgentId, tz);
1120 }
1121
1122 private boolean checkPeriodOverlap(final List<Period> periods) {
1123 final List<Period> sortedPeriods = new ArrayList<>(periods);
1124 sortedPeriods.sort(Comparator.comparing(Period::getStart));
1125 Period prior = periods.get(0);
1126 for (Period current : periods.subList(1, periods.size())) {
1127 if (current.getStart().compareTo(prior.getEnd()) < 0) {
1128 return true;
1129 }
1130 prior = current;
1131 }
1132 return false;
1133 }
1134
1135 private List<MediaPackage> findConflictingEvents(List<Period> periods, String captureAgentId, TimeZone tz)
1136 throws SchedulerException {
1137 notEmpty(captureAgentId, "captureAgentId");
1138 notNull(periods, "periods");
1139 requireTrue(periods.size() > 0, "periods");
1140
1141
1142
1143
1144 if (checkPeriodOverlap(periods)) {
1145 throw new IllegalArgumentException("RRULE periods overlap");
1146 }
1147
1148 try {
1149 TimeZoneRegistry registry = TimeZoneRegistryFactory.getInstance().createRegistry();
1150
1151 Set<MediaPackage> events = new HashSet<>();
1152
1153 for (Period event : periods) {
1154 event.setTimeZone(registry.getTimeZone(tz.getID()));
1155 final Date startDate = event.getStart();
1156 final Date endDate = event.getEnd();
1157
1158 events.addAll(findConflictingEvents(captureAgentId, startDate, endDate));
1159 }
1160
1161 return new ArrayList<>(events);
1162 } catch (Exception e) {
1163 throw new SchedulerException(e);
1164 }
1165 }
1166
1167 @Override
1168 public String getCalendar(Optional<String> captureAgentId, Optional<String> seriesId, Optional<Date> cutoff)
1169 throws SchedulerException {
1170
1171 try {
1172 final Map<String, ExtendedEventDto> searchResult = persistence.search(captureAgentId, Optional.empty(), cutoff,
1173 Optional.of(DateTime.now().minusHours(1).toDate()), Optional.empty(), Optional.empty()).stream()
1174 .collect(Collectors.toMap(ExtendedEventDto::getMediaPackageId, Function.identity()));
1175 final AQueryBuilder query = assetManager.createQuery();
1176 final AResult result = query.select(query.snapshot())
1177 .where(withOrganization(query).and(query.mediaPackageIds(searchResult.keySet().toArray(new String[0])))
1178 .and(withOwner(query)).and(query.version().isLatest()))
1179 .run();
1180
1181 final CalendarGenerator cal = new CalendarGenerator(seriesService);
1182 for (final ARecord record : result.getRecords()) {
1183 final Optional<MediaPackage> optMp = record.getSnapshot().isPresent()
1184 ? Optional.of(record.getSnapshot().get().getMediaPackage())
1185 : Optional.empty();
1186
1187
1188 if (optMp.isEmpty()) {
1189 logger.warn("Mediapackage for event '{}' can't be found, event is not recorded", record.getMediaPackageId());
1190 continue;
1191 }
1192
1193 if (seriesId.isPresent() && !seriesId.get().equals(optMp.get().getSeries())) {
1194 continue;
1195 }
1196
1197 Optional<DublinCoreCatalog> catalogOpt = loadEpisodeDublinCoreFromAsset(record.getSnapshot().get());
1198 if (catalogOpt.isEmpty()) {
1199 logger.warn("No episode catalog available, skipping!");
1200 continue;
1201 }
1202
1203 final Map<String, String> caMetadata = deserializeExtendedEventProperties(
1204 searchResult.get(record.getMediaPackageId()).getCaptureAgentProperties());
1205
1206
1207 if (caMetadata.isEmpty()) {
1208 logger.warn("Properties for event '{}' can't be found, event is not recorded", record.getMediaPackageId());
1209 continue;
1210 }
1211
1212 final String agentId = searchResult.get(record.getMediaPackageId()).getCaptureAgentId();
1213 final Date start = searchResult.get(record.getMediaPackageId()).getStartDate();
1214 final Date end = searchResult.get(record.getMediaPackageId()).getEndDate();
1215 final Date lastModified = record.getSnapshot().get().getArchivalDate();
1216
1217
1218 try {
1219 cal.addEvent(optMp.get(), catalogOpt.get(), agentId, start, end, lastModified, toPropertyString(caMetadata));
1220 } catch (Exception e) {
1221 logger.warn("Error adding event '{}' to calendar, event is not recorded", record.getMediaPackageId(), e);
1222 }
1223 }
1224
1225
1226 if (cal.getCalendar().getComponents().size() > 0) {
1227 cal.getCalendar().validate();
1228 }
1229
1230 return cal.getCalendar().toString();
1231
1232 } catch (Exception e) {
1233 throw new SchedulerException(e);
1234 }
1235 }
1236
1237 @Override
1238 public String getScheduleLastModified(String captureAgentId) throws SchedulerException {
1239 notEmpty(captureAgentId, "captureAgentId");
1240
1241 try {
1242 String lastModified = lastModifiedCache.getIfPresent(captureAgentId);
1243 if (lastModified != null) {
1244 return lastModified;
1245 }
1246
1247 populateLastModifiedCache();
1248
1249 lastModified = lastModifiedCache.getIfPresent(captureAgentId);
1250
1251
1252 if (lastModified == null) {
1253 lastModified = EMPTY_CALENDAR_ETAG;
1254 lastModifiedCache.put(captureAgentId, lastModified);
1255 }
1256 return lastModified;
1257 } catch (Exception e) {
1258 throw new SchedulerException(e);
1259 }
1260 }
1261
1262 @Override
1263 public void removeScheduledRecordingsBeforeBuffer(long buffer) throws SchedulerException {
1264 DateTime end = new DateTime(DateTimeZone.UTC).minus(buffer * 1000);
1265
1266 logger.info("Starting to look for scheduled recordings that have finished before {}.",
1267 DateTimeSupport.toUTC(end.getMillis()));
1268
1269 List<ExtendedEventDto> finishedEvents;
1270 try {
1271 finishedEvents = persistence.search(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(),
1272 Optional.of(end.toDate()), Optional.empty());
1273 logger.debug("Found {} events from search.", finishedEvents.size());
1274 } catch (Exception e) {
1275 throw new SchedulerException(e);
1276 }
1277
1278 int recordingsRemoved = 0;
1279 for (ExtendedEventDto extEvt : finishedEvents) {
1280 final String eventId = extEvt.getMediaPackageId();
1281 try {
1282 removeEvent(eventId);
1283 logger.debug("Sucessfully removed scheduled event with id " + eventId);
1284 recordingsRemoved++;
1285 } catch (NotFoundException e) {
1286 logger.debug("Skipping event with id {} because it is not found", eventId);
1287 } catch (Exception e) {
1288 logger.warn("Unable to delete event with id '{}':", eventId, e);
1289 }
1290 }
1291
1292 logger.info("Found {} to remove that ended before {}.", recordingsRemoved, DateTimeSupport.toUTC(end.getMillis()));
1293 }
1294
1295 @Override
1296 public boolean updateRecordingState(String id, String state) throws NotFoundException, SchedulerException {
1297 notEmpty(id, "id");
1298 notEmpty(state, "state");
1299
1300 if (!RecordingState.KNOWN_STATES.contains(state)) {
1301 logger.warn("Invalid recording state: {}.", state);
1302 return false;
1303 }
1304
1305 try {
1306 final Optional<ExtendedEventDto> optExtEvt = persistence.getEvent(id);
1307
1308 if (optExtEvt.isEmpty()) {
1309 throw new NotFoundException();
1310 }
1311
1312 final String prevRecordingState = optExtEvt.get().getRecordingState();
1313 final Recording r = new RecordingImpl(id, state);
1314 if (!state.equals(prevRecordingState)) {
1315 logger.debug("Setting Recording {} to state {}.", id, state);
1316
1317
1318 sendSchedulerUpdate(new SchedulerItemList(r.getID(), Collections.singletonList(SchedulerItem
1319 .updateRecordingStatus(r.getState(), r.getLastCheckinTime()))));
1320
1321
1322 updateEventInIndex(r.getID(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(),
1323 Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(r.getState()));
1324 } else {
1325 logger.debug("Recording state not changed");
1326 }
1327
1328 persistence.storeEvent(
1329 id,
1330 securityService.getOrganization().getId(),
1331 Optional.empty(),
1332 Optional.empty(),
1333 Optional.empty(),
1334 Optional.empty(),
1335 Optional.of(r.getState()),
1336 Optional.of(r.getLastCheckinTime()),
1337 Optional.empty(),
1338 Optional.empty(),
1339 Optional.empty(),
1340 Optional.empty(),
1341 Optional.empty()
1342 );
1343 return true;
1344 } catch (NotFoundException e) {
1345 throw e;
1346 } catch (Exception e) {
1347 throw new SchedulerException(e);
1348 }
1349 }
1350
1351 @Override
1352 public Recording getRecordingState(String id) throws NotFoundException, SchedulerException {
1353
1354 notEmpty(id, "id");
1355
1356 try {
1357 Optional<ExtendedEventDto> extEvt = persistence.getEvent(id);
1358
1359 if (extEvt.isEmpty() || extEvt.get().getRecordingState() == null) {
1360 throw new NotFoundException();
1361 }
1362
1363 return new RecordingImpl(id, extEvt.get().getRecordingState(), extEvt.get().getRecordingLastHeard());
1364 } catch (NotFoundException e) {
1365 throw e;
1366 } catch (Exception e) {
1367 throw new SchedulerException(e);
1368 }
1369 }
1370
1371 @Override
1372 public void removeRecording(String id) throws NotFoundException, SchedulerException {
1373 notEmpty(id, "id");
1374
1375 try {
1376 persistence.resetRecordingState(id);
1377
1378
1379 sendSchedulerUpdate(new SchedulerItemList(id, SchedulerItem.deleteRecordingState()));
1380
1381
1382 removeRecordingStatusFromIndex(id);
1383 } catch (NotFoundException e) {
1384 throw e;
1385 } catch (Exception e) {
1386 throw new SchedulerException(e);
1387 }
1388 }
1389
1390 @Override
1391 public Map<String, Recording> getKnownRecordings() throws SchedulerException {
1392 try {
1393 return persistence.getKnownRecordings().parallelStream()
1394 .collect(
1395 Collectors.toMap(ExtendedEventDto::getMediaPackageId,
1396 dto -> new RecordingImpl(dto.getMediaPackageId(), dto.getRecordingState(), dto.getRecordingLastHeard()))
1397 );
1398 } catch (Exception e) {
1399 throw new SchedulerException(e);
1400 }
1401 }
1402
1403 private synchronized void persistEvent(final String mpId, final String checksum,
1404 final Optional<Date> startDateTime, final Optional<Date> endDateTime, final Optional<String> captureAgentId,
1405 final Optional<Set<String>> userIds, final Optional<MediaPackage> mediaPackage,
1406 final Optional<Map<String, String>> wfProperties, final Optional<Map<String, String>> caProperties,
1407 final Optional<String> schedulingSource) throws SchedulerServiceDatabaseException {
1408
1409 if (mediaPackage.isPresent()) {
1410 assetManager.takeSnapshot(SNAPSHOT_OWNER, mediaPackage.get());
1411 }
1412
1413
1414 persistence.storeEvent(
1415 mpId,
1416 securityService.getOrganization().getId(),
1417 captureAgentId,
1418 startDateTime,
1419 endDateTime,
1420 schedulingSource,
1421 Optional.empty(),
1422 Optional.empty(),
1423 userIds.isPresent() ? Optional.of(String.join(",", userIds.get())) : Optional.empty(),
1424 Optional.of(new Date()),
1425 Optional.of(checksum),
1426 wfProperties,
1427 caProperties
1428 );
1429 }
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444 private void updateEventInIndex(String mediaPackageId, Optional<AccessControlList> acl,
1445 Optional<DublinCoreCatalog> dublinCore, Optional<Date> startTime, Optional<Date> endTime,
1446 Optional<Set<String>> presenters, Optional<String> agentId, Optional<Map<String, String>> properties,
1447 Optional<String> recordingStatus) {
1448
1449 String organization = getSecurityService().getOrganization().getId();
1450 User user = getSecurityService().getUser();
1451
1452 Function<Optional<Event>, Optional<Event>> updateFunction = getEventUpdateFunction(mediaPackageId, acl, dublinCore,
1453 startTime, endTime, presenters, agentId, properties, recordingStatus, organization, user);
1454
1455 try {
1456 index.addOrUpdateEvent(mediaPackageId, updateFunction, organization, user);
1457 logger.debug("Scheduled event {} updated in the {} index.", mediaPackageId, index.getIndexName());
1458 } catch (SearchIndexException e) {
1459 logger.error("Error updating the scheduled event {} in the {} index.", mediaPackageId, index.getIndexName(), e);
1460 }
1461 }
1462
1463
1464
1465
1466
1467
1468 private void removeRecordingStatusFromIndex(String mediaPackageId) {
1469 String organization = getSecurityService().getOrganization().getId();
1470 User user = getSecurityService().getUser();
1471
1472 Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
1473 Event event = eventOpt.orElse(new Event(mediaPackageId, organization));
1474 event.setRecordingStatus(null);
1475 return Optional.of(event);
1476 };
1477
1478 try {
1479 index.addOrUpdateEvent(mediaPackageId, updateFunction, organization, user);
1480 logger.debug("Recording state of event {} removed from the {} index.", mediaPackageId, index.getIndexName());
1481 } catch (SearchIndexException e) {
1482 logger.error("Failed to remove the recording state of event {} from the {} index.", mediaPackageId,
1483 index.getIndexName(), e);
1484 }
1485 }
1486
1487
1488
1489
1490
1491
1492 private void removeSchedulingInfoFromIndex(String mediaPackageId) {
1493 String orgId = getSecurityService().getOrganization().getId();
1494
1495 try {
1496 index.deleteEvent(mediaPackageId, orgId);
1497 logger.debug("Scheduling information of event {} removed from the {} index.", mediaPackageId,
1498 index.getIndexName());
1499 } catch (SearchIndexException e) {
1500 logger.error("Failed to delete the scheduling information of event {} from the {} index.", mediaPackageId,
1501 index.getIndexName(), e);
1502 }
1503 }
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516 private void updateLiveEvent(String mpId, Optional<AccessControlList> acl, Optional<DublinCoreCatalog> dublinCore,
1517 Optional<Date> startTime, Optional<Date> endTime, Optional<String> agentId,
1518 Optional<Map<String, String>> properties) {
1519 List<SchedulerItem> items = new ArrayList<>();
1520 if (acl.isPresent()) {
1521 items.add(SchedulerItem.updateAcl(acl.get()));
1522 }
1523 if (dublinCore.isPresent()) {
1524 items.add(SchedulerItem.updateCatalog(dublinCore.get()));
1525 }
1526 if (startTime.isPresent()) {
1527 items.add(SchedulerItem.updateStart(startTime.get()));
1528 }
1529 if (endTime.isPresent()) {
1530 items.add(SchedulerItem.updateEnd(endTime.get()));
1531 }
1532 if (agentId.isPresent()) {
1533 items.add(SchedulerItem.updateAgent(agentId.get()));
1534 }
1535 if (properties.isPresent()) {
1536 items.add(SchedulerItem.updateProperties(properties.get()));
1537 }
1538
1539 if (!items.isEmpty()) {
1540 sendSchedulerUpdate(new SchedulerItemList(mpId, items));
1541 }
1542 }
1543
1544 private Map<String, String> getFinalAgentProperties(Map<String, String> caMetadata, Map<String, String> wfProperties,
1545 String captureAgentId, Optional<String> seriesId, Optional<DublinCoreCatalog> dublinCore) {
1546 Map<String, String> properties = new HashMap<>();
1547 for (Entry<String, String> entry : caMetadata.entrySet()) {
1548 if (entry.getKey().startsWith(WORKFLOW_CONFIG_PREFIX)) {
1549 continue;
1550 }
1551 properties.put(entry.getKey(), entry.getValue());
1552 }
1553 for (Entry<String, String> entry : wfProperties.entrySet()) {
1554 properties.put(WORKFLOW_CONFIG_PREFIX.concat(entry.getKey()), entry.getValue());
1555 }
1556 if (dublinCore.isPresent()) {
1557 properties.put("event.title", dublinCore.get().getFirst(DublinCore.PROPERTY_TITLE));
1558 }
1559 if (seriesId.isPresent()) {
1560 properties.put("event.series", seriesId.get());
1561 }
1562 properties.put("event.location", captureAgentId);
1563 return properties;
1564 }
1565
1566 private void touchLastEntry(String captureAgentId) throws SchedulerException {
1567
1568 try {
1569 logger.debug("Marking calendar feed for {} as modified", captureAgentId);
1570 persistence.touchLastEntry(captureAgentId);
1571 populateLastModifiedCache();
1572 } catch (SchedulerServiceDatabaseException e) {
1573 logger.error("Failed to update last modified entry of agent '{}':", captureAgentId, e);
1574 }
1575 }
1576
1577 private void populateLastModifiedCache() throws SchedulerException {
1578 try {
1579 Map<String, Date> lastModifiedDates = persistence.getLastModifiedDates();
1580 for (Entry<String, Date> entry : lastModifiedDates.entrySet()) {
1581 Date lastModifiedDate = entry.getValue() != null ? entry.getValue() : new Date();
1582 lastModifiedCache.put(entry.getKey(), generateLastModifiedHash(lastModifiedDate));
1583 }
1584 } catch (Exception e) {
1585 throw new SchedulerException(e);
1586 }
1587 }
1588
1589 private String generateLastModifiedHash(Date lastModifiedDate) {
1590 return "mod" + Long.toString(lastModifiedDate.getTime());
1591 }
1592
1593 private String toPropertyString(Map<String, String> properties) {
1594 StringBuilder wfPropertiesString = new StringBuilder();
1595 for (Map.Entry<String, String> entry : properties.entrySet()) {
1596 wfPropertiesString.append(entry.getKey() + "=" + entry.getValue() + "\n");
1597 }
1598 return wfPropertiesString.toString();
1599 }
1600
1601 private MediaPackage getEventMediaPackage(final String mediaPackageId, boolean checkOwner) {
1602 AQueryBuilder query = assetManager.createQuery();
1603 var predicate = withOrganization(query)
1604 .and(query.mediaPackageId(mediaPackageId))
1605 .and(query.version().isLatest());
1606 if (checkOwner) {
1607 predicate = predicate.and(withOwner(query));
1608 }
1609
1610 Optional<ARecord> record = query.select(query.snapshot()).where(predicate).run().getRecords().stream().findFirst();
1611 if (record.isEmpty()) {
1612 throw new RuntimeNotFoundException(new NotFoundException());
1613 }
1614
1615 return record.get().getSnapshot().get().getMediaPackage();
1616 }
1617
1618 private MediaPackage getEventMediaPackage(final String mediaPackageId) {
1619 return getEventMediaPackage(mediaPackageId, true);
1620 }
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634 private MediaPackage updateDublincCoreCatalog(MediaPackage mp, DublinCoreCatalog dc)
1635 throws IOException, MediaPackageException {
1636 try (InputStream inputStream = IOUtils.toInputStream(dc.toXmlString(), "UTF-8")) {
1637
1638 Catalog[] catalogs = mp.getCatalogs(MediaPackageElements.EPISODE);
1639 if (catalogs.length > 0) {
1640 Catalog catalog = catalogs[0];
1641 URI uri = workspace.put(mp.getIdentifier().toString(), catalog.getIdentifier(), "dublincore.xml", inputStream);
1642 catalog.setURI(uri);
1643
1644 catalog.setChecksum(null);
1645 } else {
1646 throw new MediaPackageException("Unable to find catalog");
1647 }
1648 }
1649 return mp;
1650 }
1651
1652 private TechnicalMetadata getTechnicalMetadata(ExtendedEventDto extEvt) {
1653 final String agentId = extEvt.getCaptureAgentId();
1654 final Date start = extEvt.getStartDate();
1655 final Date end = extEvt.getEndDate();
1656 final Set<String> presenters = getPresenters(Optional.ofNullable(extEvt.getPresenters()).orElse(""));
1657 final Optional<String> recordingStatus = Optional.ofNullable(extEvt.getRecordingState());
1658 final Optional<Long> lastHeard = Optional.ofNullable(extEvt.getRecordingLastHeard());
1659 final Map<String, String> caMetadata = deserializeExtendedEventProperties(extEvt.getCaptureAgentProperties());
1660 final Map<String, String> wfProperties = deserializeExtendedEventProperties(extEvt.getWorkflowProperties());
1661
1662 Recording recording = null;
1663 if (recordingStatus.isPresent() && lastHeard.isPresent()) {
1664 recording = new RecordingImpl(extEvt.getMediaPackageId(), recordingStatus.get(), lastHeard.get());
1665 }
1666
1667 return new TechnicalMetadataImpl(extEvt.getMediaPackageId(), agentId, start, end, presenters, wfProperties,
1668 caMetadata, Optional.ofNullable(recording));
1669 }
1670
1671 private Predicate withOrganization(AQueryBuilder query) {
1672 return query.organizationId().eq(securityService.getOrganization().getId());
1673 }
1674
1675 private Predicate withOwner(AQueryBuilder query) {
1676 return query.owner().eq(SNAPSHOT_OWNER);
1677 }
1678
1679 private Set<String> getPresenters(String presentersString) {
1680 return new HashSet<>(Arrays.asList(StringUtils.split(presentersString, ",")));
1681 }
1682
1683
1684
1685
1686 private List<MediaPackageElementFlavor> getEventCatalogUIAdapterFlavors() {
1687 String organization = securityService.getOrganization().getId();
1688 return eventCatalogUIAdapters.stream()
1689 .filter(adapter -> adapter.getOrganization().equals(organization))
1690 .map(EventCatalogUIAdapter::getFlavor)
1691 .filter(mpe -> !MediaPackageElements.EPISODE.matches(mpe))
1692 .collect(Collectors.toList());
1693 }
1694
1695 @Override
1696 public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
1697 try {
1698 final int total;
1699 try {
1700 total = persistence.countEvents();
1701 } catch (SchedulerServiceDatabaseException e) {
1702 logIndexRebuildError(logger, e);
1703 throw new IndexRebuildException(getService(), e);
1704 }
1705 logIndexRebuildBegin(logger, total, "scheduled events");
1706 final int[] current = {0};
1707 int n = 20;
1708 var updatedEventRange = new ArrayList<Event>();
1709
1710 for (Organization organization: orgDirectoryService.getOrganizations()) {
1711 final User user = SecurityUtil.createSystemUser(systemUserName, organization);
1712 SecurityUtil.runAs(securityService, organization, user,
1713 () -> {
1714 final List<ExtendedEventDto> events;
1715 try {
1716 events = persistence.getEvents();
1717 } catch (SchedulerServiceDatabaseException e) {
1718 logIndexRebuildError(logger, e, organization);
1719 return;
1720 }
1721
1722 for (ExtendedEventDto event : events) {
1723 try {
1724 current[0]++;
1725
1726 var updatedEventData = Optional.of(new Event(event.getMediaPackageId(), organization.getId()));
1727
1728 final Set<String> presenters = getPresenters(
1729 Optional.ofNullable(event.getPresenters()).orElse(""));
1730 final Map<String, String> caMetadata = deserializeExtendedEventProperties(
1731 event.getCaptureAgentProperties());
1732
1733 updatedEventData = getEventUpdateFunction(event.getMediaPackageId(), Optional.empty(),
1734 Optional.empty(), Optional.of(event.getStartDate()), Optional.of(event.getEndDate()),
1735 Optional.of(presenters), Optional.of(event.getCaptureAgentId()), Optional.of(caMetadata),
1736 Optional.ofNullable(event.getRecordingState()), organization.getId(),
1737 securityService.getUser()).apply(updatedEventData);
1738 updatedEventRange.add(updatedEventData.get());
1739
1740 if (updatedEventRange.size() >= n || current[0] >= events.size()) {
1741 index.bulkEventUpdate(updatedEventRange);
1742 logIndexRebuildProgress(logger, total, current[0], n);
1743 updatedEventRange.clear();
1744 }
1745
1746 } catch (SearchIndexException e) {
1747 logger.error("Error while updating event '{}' from search index:", event.getMediaPackageId(), e);
1748 } catch (Exception e) {
1749 throw new RuntimeException("Fatal error while indexing event " + event.getMediaPackageId(), e);
1750 }
1751 }
1752 });
1753 }
1754 } catch (Exception e) {
1755 logIndexRebuildError(logger, e);
1756 throw new IndexRebuildException(getService(), e);
1757 }
1758 }
1759
1760 @Override
1761 public IndexRebuildService.Service getService() {
1762 return IndexRebuildService.Service.Scheduler;
1763 }
1764
1765 public SecurityService getSecurityService() {
1766 return securityService;
1767 }
1768
1769
1770
1771
1772
1773
1774
1775
1776 private Function<Optional<Event>, Optional<Event>> getEventUpdateFunction(String mediaPackageId,
1777 Optional<AccessControlList> acl, Optional<DublinCoreCatalog> dublinCore, Optional<Date> startTime,
1778 Optional<Date> endTime, Optional<Set<String>> presenters, Optional<String> agentId,
1779 Optional<Map<String, String>> properties, Optional<String> recordingStatus, String orgId, User user) {
1780 return (Optional<Event> eventOpt) -> {
1781 Event event = eventOpt.orElse(new Event(mediaPackageId, orgId));
1782
1783 if (acl.isPresent()) {
1784 event.setAccessPolicy(AccessControlParser.toJsonSilent(acl.get()));
1785 }
1786 if (dublinCore.isPresent()) {
1787 EventIndexUtils.updateEvent(event, dublinCore.get());
1788 if (isBlank(event.getCreator())) {
1789 event.setCreator(getSecurityService().getUser().getName());
1790 }
1791
1792
1793 try {
1794 EventIndexUtils.updateSeriesName(event, orgId, user, index);
1795 } catch (SearchIndexException e) {
1796 logger.error("Error updating the series name of the event {} in the {} index.", mediaPackageId,
1797 index.getIndexName(), e);
1798 }
1799 }
1800 if (presenters.isPresent()) {
1801 event.setTechnicalPresenters(new ArrayList<>(presenters.get()));
1802 }
1803 if (agentId.isPresent()) {
1804 event.setAgentId(agentId.get());
1805 }
1806 if (recordingStatus.isPresent() && !recordingStatus.get().equals(RecordingState.UNKNOWN)) {
1807 event.setRecordingStatus(recordingStatus.get());
1808 }
1809 if (properties.isPresent()) {
1810 event.setAgentConfiguration(properties.get());
1811 }
1812 if (startTime.isPresent()) {
1813 String startTimeStr = startTime == null ? null : DateTimeSupport.toUTC(startTime.get().getTime());
1814 event.setTechnicalStartTime(startTimeStr);
1815 }
1816 if (endTime.isPresent()) {
1817 String endTimeStr = endTime == null ? null : DateTimeSupport.toUTC(endTime.get().getTime());
1818 event.setTechnicalEndTime(endTimeStr);
1819 }
1820
1821 return Optional.of(event);
1822 };
1823 }
1824 }