1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.scheduler.impl;
22
23 import static org.opencastproject.metadata.dublincore.DublinCore.PROPERTY_TEMPORAL;
24
25 import org.opencastproject.mediapackage.Catalog;
26 import org.opencastproject.mediapackage.MediaPackage;
27 import org.opencastproject.mediapackage.MediaPackageElements;
28 import org.opencastproject.metadata.dublincore.DCMIPeriod;
29 import org.opencastproject.metadata.dublincore.DublinCore;
30 import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
31 import org.opencastproject.metadata.dublincore.DublinCoreUtil;
32 import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
33 import org.opencastproject.metadata.dublincore.Precision;
34 import org.opencastproject.scheduler.api.SchedulerException;
35 import org.opencastproject.scheduler.api.SchedulerService;
36 import org.opencastproject.security.api.Organization;
37 import org.opencastproject.security.api.OrganizationDirectoryService;
38 import org.opencastproject.security.api.SecurityService;
39 import org.opencastproject.security.api.UnauthorizedException;
40 import org.opencastproject.security.api.User;
41 import org.opencastproject.security.util.SecurityUtil;
42 import org.opencastproject.serviceregistry.api.ServiceRegistry;
43 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
44 import org.opencastproject.util.NotFoundException;
45 import org.opencastproject.workspace.api.Workspace;
46
47 import com.entwinemedia.fn.data.Opt;
48
49 import org.apache.commons.io.FilenameUtils;
50 import org.apache.commons.io.IOUtils;
51 import org.apache.commons.lang3.StringUtils;
52 import org.joda.time.DateTime;
53 import org.osgi.service.cm.ConfigurationException;
54 import org.osgi.service.cm.ManagedService;
55 import org.osgi.service.component.ComponentContext;
56 import org.osgi.service.component.annotations.Activate;
57 import org.osgi.service.component.annotations.Component;
58 import org.osgi.service.component.annotations.Deactivate;
59 import org.osgi.service.component.annotations.Reference;
60 import org.quartz.Job;
61 import org.quartz.JobDetail;
62 import org.quartz.JobExecutionContext;
63 import org.quartz.JobExecutionException;
64 import org.quartz.Trigger;
65 import org.quartz.TriggerUtils;
66 import org.quartz.impl.StdSchedulerFactory;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 import java.io.IOException;
71 import java.net.URI;
72 import java.util.Date;
73 import java.util.Dictionary;
74 import java.util.List;
75 import java.util.Map;
76 import java.util.Optional;
77 import java.util.Set;
78
79
80 @Component(
81 immediate = true,
82 service = { ManagedService.class,CaptureNowProlongingService.class },
83 property = {
84 "service.description=Capture Prolonging Service"
85 }
86 )
87 public class CaptureNowProlongingService implements ManagedService {
88
89
90 private static final Logger logger = LoggerFactory.getLogger(CaptureNowProlongingService.class);
91
92 private static final String CFG_KEY_INITIAL_TIME = "initial-time";
93 private static final String CFG_KEY_PROLONGING_TIME = "prolonging-time";
94
95 private static final String JOB_NAME = "mh-capture-prolonging-job";
96 private static final String JOB_GROUP = "mh-capture-prolonging-job-group";
97 private static final String TRIGGER_GROUP = "mh-capture-prolonging-trigger-group";
98 private static final String JOB_PARAM_PARENT = "parent";
99
100
101 private int initialTime = -1;
102
103
104 private int prolongingTime = -1;
105
106
107 private org.quartz.Scheduler quartz;
108
109
110 private SchedulerService schedulerService;
111
112
113 private SecurityService securityService;
114
115
116 private ServiceRegistry serviceRegistry;
117
118
119 private OrganizationDirectoryService orgDirectoryService;
120
121
122 private Workspace workspace;
123
124
125 private ComponentContext componentContext;
126
127
128 @Reference
129 public void setSchedulerService(SchedulerService schedulerService) {
130 this.schedulerService = schedulerService;
131 }
132
133
134 @Reference
135 public void setSecurityService(SecurityService securityService) {
136 this.securityService = securityService;
137 }
138
139
140 @Reference
141 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
142 this.serviceRegistry = serviceRegistry;
143 }
144
145
146 @Reference
147 public void setOrgDirectoryService(OrganizationDirectoryService orgDirectoryService) {
148 this.orgDirectoryService = orgDirectoryService;
149 }
150
151
152 @Reference
153 public void setWorkspace(Workspace workspace) {
154 this.workspace = workspace;
155 }
156
157
158
159
160
161
162
163 @Activate
164 public void activate(ComponentContext cc) {
165 componentContext = cc;
166 try {
167 quartz = new StdSchedulerFactory().getScheduler();
168 quartz.start();
169
170 final JobDetail job = new JobDetail(JOB_NAME, JOB_GROUP, Runner.class);
171 job.setDurability(true);
172 job.setVolatility(true);
173 job.getJobDataMap().put(JOB_PARAM_PARENT, this);
174 quartz.addJob(job, true);
175 } catch (org.quartz.SchedulerException e) {
176 throw new RuntimeException(e);
177 }
178 }
179
180
181
182
183 @Deactivate
184 public void deactivate(ComponentContext cc) {
185 componentContext = null;
186 shutdown();
187 }
188
189 @Override
190 public void updated(Dictionary<String, ?> properties) throws ConfigurationException {
191
192 try {
193 initialTime = Integer.parseInt(StringUtils.defaultIfBlank((String) properties.get(CFG_KEY_INITIAL_TIME), "300"));
194 } catch (NumberFormatException e) {
195 throw new ConfigurationException(CFG_KEY_INITIAL_TIME, "Not an integer", e);
196 }
197 initialTime = Math.max(initialTime, 90) * 1000;
198
199
200 try {
201 prolongingTime = Integer.parseInt(
202 StringUtils.defaultIfBlank((String) properties.get(CFG_KEY_PROLONGING_TIME), "300"));
203 } catch (NumberFormatException e) {
204 throw new ConfigurationException(CFG_KEY_PROLONGING_TIME, "Not an integer", e);
205 }
206 prolongingTime = Math.max(prolongingTime, 90) * 1000;
207 }
208
209
210
211
212 public void schedule(String agentId) throws org.quartz.SchedulerException {
213 logger.debug("Capture prolonging job for agent '{}' is run every minute.", agentId);
214 final Trigger trigger = TriggerUtils.makeMinutelyTrigger();
215 trigger.setStartTime(DateTime.now().plusMinutes(1).toDate());
216 trigger.setName(agentId);
217 trigger.setGroup(TRIGGER_GROUP);
218 trigger.setJobName(JOB_NAME);
219 trigger.setJobGroup(JOB_GROUP);
220 if (quartz.getTrigger(agentId, TRIGGER_GROUP) == null) {
221 quartz.scheduleJob(trigger);
222 } else {
223 quartz.rescheduleJob(agentId, TRIGGER_GROUP, trigger);
224 }
225 }
226
227 public void stop(String agentId) {
228 try {
229 quartz.unscheduleJob(agentId, TRIGGER_GROUP);
230 logger.info("Stopped prolonging capture for agent '{}'", agentId);
231 } catch (Exception e) {
232 logger.error("Error stopping Quartz job for agent '{}'", agentId, e);
233 }
234 }
235
236
237 public void shutdown() {
238 try {
239 quartz.shutdown();
240 } catch (org.quartz.SchedulerException ignore) {
241 }
242 }
243
244
245 @Override
246 protected void finalize() throws Throwable {
247 super.finalize();
248 shutdown();
249 }
250
251
252
253
254
255
256 public int getInitialTime() {
257 return initialTime;
258 }
259
260
261
262
263
264
265 public int getProlongingTime() {
266 return prolongingTime;
267 }
268
269 public SecurityService getSecurityService() {
270 return securityService;
271 }
272
273 public ComponentContext getComponentContext() {
274 return componentContext;
275 }
276
277 public ServiceRegistry getServiceRegistry() {
278 return serviceRegistry;
279 }
280
281 public OrganizationDirectoryService getOrgDirectoryService() {
282 return orgDirectoryService;
283 }
284
285 public Workspace getWorkspace() {
286 return workspace;
287 }
288
289
290
291
292 public static class Runner implements Job {
293
294 @Override
295 public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
296 logger.debug("Starting ad-hoc prolonging job for agent '{}'", jobExecutionContext.getTrigger().getName());
297 try {
298 execute((CaptureNowProlongingService) jobExecutionContext.getJobDetail().getJobDataMap().get(JOB_PARAM_PARENT),
299 jobExecutionContext.getTrigger().getName());
300 } catch (Exception e) {
301 throw new JobExecutionException("An error occurred while prolonging ad-hoc recordings", e);
302 }
303 logger.debug("Finished ad-hoc prolonging job for agent '{}'", jobExecutionContext.getTrigger().getName());
304 }
305
306 private void execute(final CaptureNowProlongingService prolongingService, final String agentId) {
307 for (Organization organization : prolongingService.getOrgDirectoryService().getOrganizations()) {
308 User user = SecurityUtil.createSystemUser(prolongingService.getComponentContext(), organization);
309 SecurityUtil.runAs(prolongingService.getSecurityService(), organization, user, () -> {
310 try {
311 MediaPackage mp = prolongingService.getCurrentRecording(agentId);
312 Optional<DublinCoreCatalog> dublinCore = DublinCoreUtil.loadEpisodeDublinCore(
313 prolongingService.getWorkspace(),
314 mp);
315 if (dublinCore.isPresent()
316 && EncodingSchemeUtils.decodeMandatoryPeriod(dublinCore.get().getFirst(PROPERTY_TEMPORAL))
317 .getEnd().before(DateTime.now().plusSeconds(90).toDate())) {
318 prolong(prolongingService, mp, dublinCore.get(), agentId);
319 } else {
320 logger.debug("Wait another minute before extending the ad-hoc recording for agent '{}'", agentId);
321 }
322 } catch (NotFoundException e) {
323 logger.info("Unable to extend the ad-hoc recording for agent '{}': No ad-hoc recording found", agentId);
324 } catch (Exception e) {
325 logger.error("Error extending the ad-hoc recording for agent '{}'", agentId, e);
326 }
327 });
328 }
329 }
330
331 private void prolong(final CaptureNowProlongingService prolongingService, final MediaPackage event,
332 final DublinCoreCatalog dublinCore, final String agentId)
333 throws NotFoundException, ServiceRegistryException {
334 try {
335 logger.info("Extending ad-hoc recording for agent '{}'", agentId);
336 prolongingService.prolongEvent(event, dublinCore, agentId);
337 } catch (UnauthorizedException e) {
338 logger.error("Error extending the ad-hoc recording for agent '{}': Permission denied", agentId);
339 } catch (NotFoundException e) {
340 logger.warn("Error extending the ad-hoc recording for agent '{}': No ad-hoc recording found", agentId);
341 } catch (Exception e) {
342 logger.error("Error extending the ad-hoc recording for agent '{}'", agentId, e);
343 }
344 }
345
346 }
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361 public MediaPackage getCurrentRecording(String agentId)
362 throws NotFoundException, UnauthorizedException, SchedulerException {
363 Opt<MediaPackage> current = schedulerService.getCurrentRecording(agentId);
364 if (current.isNone()) {
365 logger.warn("Unable to load the current recording for agent '{}': no recording found", agentId);
366 throw new NotFoundException("No current recording found for agent '" + agentId + "'");
367 }
368 return current.get();
369 }
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391 public void prolongEvent(MediaPackage event, DublinCoreCatalog dublinCore, String agentId)
392 throws UnauthorizedException, NotFoundException, SchedulerException, IllegalArgumentException, IOException {
393 String eventId = event.getIdentifier().toString();
394
395 DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(dublinCore.getFirst(DublinCore.PROPERTY_TEMPORAL));
396
397 Date prolongedEndDate = new DateTime(period.getEnd()).plus(getProlongingTime()).toDate();
398
399 dublinCore.set(PROPERTY_TEMPORAL,
400 EncodingSchemeUtils.encodePeriod(new DCMIPeriod(period.getStart(), prolongedEndDate), Precision.Second));
401
402 List<MediaPackage> events = schedulerService.findConflictingEvents(agentId, period.getStart(), prolongedEndDate);
403 for (MediaPackage conflictMediaPackage : events) {
404 if (eventId.equals(conflictMediaPackage.getIdentifier().toString()))
405 continue;
406
407 Optional<DublinCoreCatalog> conflictingDc = DublinCoreUtil.loadEpisodeDublinCore(workspace, conflictMediaPackage);
408 if (conflictingDc.isEmpty()) {
409 continue;
410 }
411
412 Date conflictingStartDate = EncodingSchemeUtils
413 .decodeMandatoryPeriod(conflictingDc.get().getFirst(DublinCore.PROPERTY_TEMPORAL)).getStart();
414
415 prolongedEndDate = new DateTime(conflictingStartDate).minusMinutes(1).toDate();
416
417 dublinCore.set(PROPERTY_TEMPORAL,
418 EncodingSchemeUtils.encodePeriod(new DCMIPeriod(period.getStart(), prolongedEndDate), Precision.Second));
419
420 logger.info(
421 "A scheduled event is preventing the current recording on agent '{}' to be further extended. Extending to one minute before the conflicting event",
422 agentId);
423 stop(agentId);
424 break;
425 }
426
427
428 Catalog[] episodeCatalogs = event.getCatalogs(MediaPackageElements.EPISODE);
429 if (episodeCatalogs.length > 0) {
430 Catalog c = episodeCatalogs[0];
431 String filename = FilenameUtils.getName(c.getURI().toString());
432 URI uri = workspace.put(event.getIdentifier().toString(), c.getIdentifier(), filename,
433 IOUtils.toInputStream(dublinCore.toXmlString(), "UTF-8"));
434 c.setURI(uri);
435
436 c.setChecksum(null);
437 }
438
439 schedulerService.updateEvent(eventId, Opt.<Date> none(), Opt.some(prolongedEndDate), Opt.<String> none(),
440 Opt.<Set<String>> none(), Opt.some(event), Opt.<Map<String, String>> none(),
441 Opt.<Map<String, String>> none());
442 }
443
444 }