1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.scheduler.impl;
22
23 import static org.opencastproject.metadata.dublincore.DublinCore.PROPERTY_TEMPORAL;
24
25 import org.opencastproject.mediapackage.Catalog;
26 import org.opencastproject.mediapackage.MediaPackage;
27 import org.opencastproject.mediapackage.MediaPackageElements;
28 import org.opencastproject.metadata.dublincore.DCMIPeriod;
29 import org.opencastproject.metadata.dublincore.DublinCore;
30 import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
31 import org.opencastproject.metadata.dublincore.DublinCoreUtil;
32 import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
33 import org.opencastproject.metadata.dublincore.Precision;
34 import org.opencastproject.scheduler.api.SchedulerException;
35 import org.opencastproject.scheduler.api.SchedulerService;
36 import org.opencastproject.security.api.Organization;
37 import org.opencastproject.security.api.OrganizationDirectoryService;
38 import org.opencastproject.security.api.SecurityService;
39 import org.opencastproject.security.api.UnauthorizedException;
40 import org.opencastproject.security.api.User;
41 import org.opencastproject.security.util.SecurityUtil;
42 import org.opencastproject.serviceregistry.api.ServiceRegistry;
43 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
44 import org.opencastproject.util.NotFoundException;
45 import org.opencastproject.workspace.api.Workspace;
46
47 import org.apache.commons.io.FilenameUtils;
48 import org.apache.commons.io.IOUtils;
49 import org.apache.commons.lang3.StringUtils;
50 import org.joda.time.DateTime;
51 import org.osgi.service.cm.ConfigurationException;
52 import org.osgi.service.cm.ManagedService;
53 import org.osgi.service.component.ComponentContext;
54 import org.osgi.service.component.annotations.Activate;
55 import org.osgi.service.component.annotations.Component;
56 import org.osgi.service.component.annotations.Deactivate;
57 import org.osgi.service.component.annotations.Reference;
58 import org.quartz.Job;
59 import org.quartz.JobDetail;
60 import org.quartz.JobExecutionContext;
61 import org.quartz.JobExecutionException;
62 import org.quartz.Trigger;
63 import org.quartz.TriggerUtils;
64 import org.quartz.impl.StdSchedulerFactory;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 import java.io.IOException;
69 import java.net.URI;
70 import java.util.Date;
71 import java.util.Dictionary;
72 import java.util.List;
73 import java.util.Optional;
74
75
76 @Component(
77 immediate = true,
78 service = { ManagedService.class,CaptureNowProlongingService.class },
79 property = {
80 "service.description=Capture Prolonging Service"
81 }
82 )
83 public class CaptureNowProlongingService implements ManagedService {
84
85
86 private static final Logger logger = LoggerFactory.getLogger(CaptureNowProlongingService.class);
87
88 private static final String CFG_KEY_INITIAL_TIME = "initial-time";
89 private static final String CFG_KEY_PROLONGING_TIME = "prolonging-time";
90
91 private static final String JOB_NAME = "mh-capture-prolonging-job";
92 private static final String JOB_GROUP = "mh-capture-prolonging-job-group";
93 private static final String TRIGGER_GROUP = "mh-capture-prolonging-trigger-group";
94 private static final String JOB_PARAM_PARENT = "parent";
95
96
97 private int initialTime = -1;
98
99
100 private int prolongingTime = -1;
101
102
103 private org.quartz.Scheduler quartz;
104
105
106 private SchedulerService schedulerService;
107
108
109 private SecurityService securityService;
110
111
112 private ServiceRegistry serviceRegistry;
113
114
115 private OrganizationDirectoryService orgDirectoryService;
116
117
118 private Workspace workspace;
119
120
121 private ComponentContext componentContext;
122
123
124 @Reference
125 public void setSchedulerService(SchedulerService schedulerService) {
126 this.schedulerService = schedulerService;
127 }
128
129
130 @Reference
131 public void setSecurityService(SecurityService securityService) {
132 this.securityService = securityService;
133 }
134
135
136 @Reference
137 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
138 this.serviceRegistry = serviceRegistry;
139 }
140
141
142 @Reference
143 public void setOrgDirectoryService(OrganizationDirectoryService orgDirectoryService) {
144 this.orgDirectoryService = orgDirectoryService;
145 }
146
147
148 @Reference
149 public void setWorkspace(Workspace workspace) {
150 this.workspace = workspace;
151 }
152
153
154
155
156
157
158
159 @Activate
160 public void activate(ComponentContext cc) {
161 componentContext = cc;
162 try {
163 quartz = new StdSchedulerFactory().getScheduler();
164 quartz.start();
165
166 final JobDetail job = new JobDetail(JOB_NAME, JOB_GROUP, Runner.class);
167 job.setDurability(true);
168 job.setVolatility(true);
169 job.getJobDataMap().put(JOB_PARAM_PARENT, this);
170 quartz.addJob(job, true);
171 } catch (org.quartz.SchedulerException e) {
172 throw new RuntimeException(e);
173 }
174 }
175
176
177
178
179 @Deactivate
180 public void deactivate(ComponentContext cc) {
181 componentContext = null;
182 shutdown();
183 }
184
185 @Override
186 public void updated(Dictionary<String, ?> properties) throws ConfigurationException {
187
188 try {
189 initialTime = Integer.parseInt(StringUtils.defaultIfBlank((String) properties.get(CFG_KEY_INITIAL_TIME), "300"));
190 } catch (NumberFormatException e) {
191 throw new ConfigurationException(CFG_KEY_INITIAL_TIME, "Not an integer", e);
192 }
193 initialTime = Math.max(initialTime, 90) * 1000;
194
195
196 try {
197 prolongingTime = Integer.parseInt(
198 StringUtils.defaultIfBlank((String) properties.get(CFG_KEY_PROLONGING_TIME), "300"));
199 } catch (NumberFormatException e) {
200 throw new ConfigurationException(CFG_KEY_PROLONGING_TIME, "Not an integer", e);
201 }
202 prolongingTime = Math.max(prolongingTime, 90) * 1000;
203 }
204
205
206
207
208 public void schedule(String agentId) throws org.quartz.SchedulerException {
209 logger.debug("Capture prolonging job for agent '{}' is run every minute.", agentId);
210 final Trigger trigger = TriggerUtils.makeMinutelyTrigger();
211 trigger.setStartTime(DateTime.now().plusMinutes(1).toDate());
212 trigger.setName(agentId);
213 trigger.setGroup(TRIGGER_GROUP);
214 trigger.setJobName(JOB_NAME);
215 trigger.setJobGroup(JOB_GROUP);
216 if (quartz.getTrigger(agentId, TRIGGER_GROUP) == null) {
217 quartz.scheduleJob(trigger);
218 } else {
219 quartz.rescheduleJob(agentId, TRIGGER_GROUP, trigger);
220 }
221 }
222
223 public void stop(String agentId) {
224 try {
225 quartz.unscheduleJob(agentId, TRIGGER_GROUP);
226 logger.info("Stopped prolonging capture for agent '{}'", agentId);
227 } catch (Exception e) {
228 logger.error("Error stopping Quartz job for agent '{}'", agentId, e);
229 }
230 }
231
232
233 public void shutdown() {
234 try {
235 quartz.shutdown();
236 } catch (org.quartz.SchedulerException ignore) {
237 }
238 }
239
240
241 @Override
242 protected void finalize() throws Throwable {
243 super.finalize();
244 shutdown();
245 }
246
247
248
249
250
251
252 public int getInitialTime() {
253 return initialTime;
254 }
255
256
257
258
259
260
261 public int getProlongingTime() {
262 return prolongingTime;
263 }
264
265 public SecurityService getSecurityService() {
266 return securityService;
267 }
268
269 public ComponentContext getComponentContext() {
270 return componentContext;
271 }
272
273 public ServiceRegistry getServiceRegistry() {
274 return serviceRegistry;
275 }
276
277 public OrganizationDirectoryService getOrgDirectoryService() {
278 return orgDirectoryService;
279 }
280
281 public Workspace getWorkspace() {
282 return workspace;
283 }
284
285
286
287
288 public static class Runner implements Job {
289
290 @Override
291 public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
292 logger.debug("Starting ad-hoc prolonging job for agent '{}'", jobExecutionContext.getTrigger().getName());
293 try {
294 execute((CaptureNowProlongingService) jobExecutionContext.getJobDetail().getJobDataMap().get(JOB_PARAM_PARENT),
295 jobExecutionContext.getTrigger().getName());
296 } catch (Exception e) {
297 throw new JobExecutionException("An error occurred while prolonging ad-hoc recordings", e);
298 }
299 logger.debug("Finished ad-hoc prolonging job for agent '{}'", jobExecutionContext.getTrigger().getName());
300 }
301
302 private void execute(final CaptureNowProlongingService prolongingService, final String agentId) {
303 for (Organization organization : prolongingService.getOrgDirectoryService().getOrganizations()) {
304 User user = SecurityUtil.createSystemUser(prolongingService.getComponentContext(), organization);
305 SecurityUtil.runAs(prolongingService.getSecurityService(), organization, user, () -> {
306 try {
307 MediaPackage mp = prolongingService.getCurrentRecording(agentId);
308 Optional<DublinCoreCatalog> dublinCore = DublinCoreUtil.loadEpisodeDublinCore(
309 prolongingService.getWorkspace(),
310 mp);
311 if (dublinCore.isPresent()
312 && EncodingSchemeUtils.decodeMandatoryPeriod(dublinCore.get().getFirst(PROPERTY_TEMPORAL))
313 .getEnd().before(DateTime.now().plusSeconds(90).toDate())) {
314 prolong(prolongingService, mp, dublinCore.get(), agentId);
315 } else {
316 logger.debug("Wait another minute before extending the ad-hoc recording for agent '{}'", agentId);
317 }
318 } catch (NotFoundException e) {
319 logger.info("Unable to extend the ad-hoc recording for agent '{}': No ad-hoc recording found", agentId);
320 } catch (Exception e) {
321 logger.error("Error extending the ad-hoc recording for agent '{}'", agentId, e);
322 }
323 });
324 }
325 }
326
327 private void prolong(final CaptureNowProlongingService prolongingService, final MediaPackage event,
328 final DublinCoreCatalog dublinCore, final String agentId)
329 throws NotFoundException, ServiceRegistryException {
330 try {
331 logger.info("Extending ad-hoc recording for agent '{}'", agentId);
332 prolongingService.prolongEvent(event, dublinCore, agentId);
333 } catch (UnauthorizedException e) {
334 logger.error("Error extending the ad-hoc recording for agent '{}': Permission denied", agentId);
335 } catch (NotFoundException e) {
336 logger.warn("Error extending the ad-hoc recording for agent '{}': No ad-hoc recording found", agentId);
337 } catch (Exception e) {
338 logger.error("Error extending the ad-hoc recording for agent '{}'", agentId, e);
339 }
340 }
341
342 }
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357 public MediaPackage getCurrentRecording(String agentId)
358 throws NotFoundException, UnauthorizedException, SchedulerException {
359 Optional<MediaPackage> current = schedulerService.getCurrentRecording(agentId);
360 if (current.isEmpty()) {
361 logger.warn("Unable to load the current recording for agent '{}': no recording found", agentId);
362 throw new NotFoundException("No current recording found for agent '" + agentId + "'");
363 }
364 return current.get();
365 }
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387 public void prolongEvent(MediaPackage event, DublinCoreCatalog dublinCore, String agentId)
388 throws UnauthorizedException, NotFoundException, SchedulerException, IllegalArgumentException, IOException {
389 String eventId = event.getIdentifier().toString();
390
391 DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(dublinCore.getFirst(DublinCore.PROPERTY_TEMPORAL));
392
393 Date prolongedEndDate = new DateTime(period.getEnd()).plus(getProlongingTime()).toDate();
394
395 dublinCore.set(PROPERTY_TEMPORAL,
396 EncodingSchemeUtils.encodePeriod(new DCMIPeriod(period.getStart(), prolongedEndDate), Precision.Second));
397
398 List<MediaPackage> events = schedulerService.findConflictingEvents(agentId, period.getStart(), prolongedEndDate);
399 for (MediaPackage conflictMediaPackage : events) {
400 if (eventId.equals(conflictMediaPackage.getIdentifier().toString())) {
401 continue;
402 }
403
404 Optional<DublinCoreCatalog> conflictingDc = DublinCoreUtil.loadEpisodeDublinCore(workspace, conflictMediaPackage);
405 if (conflictingDc.isEmpty()) {
406 continue;
407 }
408
409 Date conflictingStartDate = EncodingSchemeUtils
410 .decodeMandatoryPeriod(conflictingDc.get().getFirst(DublinCore.PROPERTY_TEMPORAL)).getStart();
411
412 prolongedEndDate = new DateTime(conflictingStartDate).minusMinutes(1).toDate();
413
414 dublinCore.set(PROPERTY_TEMPORAL,
415 EncodingSchemeUtils.encodePeriod(new DCMIPeriod(period.getStart(), prolongedEndDate), Precision.Second));
416
417 logger.info(
418 "A scheduled event is preventing the current recording on agent '{}' to be further extended. "
419 + "Extending to one minute before the conflicting event",
420 agentId);
421 stop(agentId);
422 break;
423 }
424
425
426 Catalog[] episodeCatalogs = event.getCatalogs(MediaPackageElements.EPISODE);
427 if (episodeCatalogs.length > 0) {
428 Catalog c = episodeCatalogs[0];
429 String filename = FilenameUtils.getName(c.getURI().toString());
430 URI uri = workspace.put(event.getIdentifier().toString(), c.getIdentifier(), filename,
431 IOUtils.toInputStream(dublinCore.toXmlString(), "UTF-8"));
432 c.setURI(uri);
433
434 c.setChecksum(null);
435 }
436
437 schedulerService.updateEvent(eventId, Optional.empty(), Optional.of(prolongedEndDate), Optional.empty(),
438 Optional.empty(), Optional.of(event), Optional.empty(),
439 Optional.empty());
440 }
441
442 }