View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.scheduler.impl;
22  
23  import static org.opencastproject.metadata.dublincore.DublinCore.PROPERTY_TEMPORAL;
24  
25  import org.opencastproject.mediapackage.Catalog;
26  import org.opencastproject.mediapackage.MediaPackage;
27  import org.opencastproject.mediapackage.MediaPackageElements;
28  import org.opencastproject.metadata.dublincore.DCMIPeriod;
29  import org.opencastproject.metadata.dublincore.DublinCore;
30  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
31  import org.opencastproject.metadata.dublincore.DublinCoreUtil;
32  import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
33  import org.opencastproject.metadata.dublincore.Precision;
34  import org.opencastproject.scheduler.api.SchedulerException;
35  import org.opencastproject.scheduler.api.SchedulerService;
36  import org.opencastproject.security.api.Organization;
37  import org.opencastproject.security.api.OrganizationDirectoryService;
38  import org.opencastproject.security.api.SecurityService;
39  import org.opencastproject.security.api.UnauthorizedException;
40  import org.opencastproject.security.api.User;
41  import org.opencastproject.security.util.SecurityUtil;
42  import org.opencastproject.serviceregistry.api.ServiceRegistry;
43  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
44  import org.opencastproject.util.NotFoundException;
45  import org.opencastproject.workspace.api.Workspace;
46  
47  import org.apache.commons.io.FilenameUtils;
48  import org.apache.commons.io.IOUtils;
49  import org.apache.commons.lang3.StringUtils;
50  import org.joda.time.DateTime;
51  import org.osgi.service.cm.ConfigurationException;
52  import org.osgi.service.cm.ManagedService;
53  import org.osgi.service.component.ComponentContext;
54  import org.osgi.service.component.annotations.Activate;
55  import org.osgi.service.component.annotations.Component;
56  import org.osgi.service.component.annotations.Deactivate;
57  import org.osgi.service.component.annotations.Reference;
58  import org.quartz.Job;
59  import org.quartz.JobDetail;
60  import org.quartz.JobExecutionContext;
61  import org.quartz.JobExecutionException;
62  import org.quartz.Trigger;
63  import org.quartz.TriggerUtils;
64  import org.quartz.impl.StdSchedulerFactory;
65  import org.slf4j.Logger;
66  import org.slf4j.LoggerFactory;
67  
68  import java.io.IOException;
69  import java.net.URI;
70  import java.util.Date;
71  import java.util.Dictionary;
72  import java.util.List;
73  import java.util.Optional;
74  
75  /** Prolong immediate recordings before reaching the end, as long as there are no conflicts */
76  @Component(
77      immediate = true,
78      service = { ManagedService.class,CaptureNowProlongingService.class },
79      property = {
80          "service.description=Capture Prolonging Service"
81      }
82  )
83  public class CaptureNowProlongingService implements ManagedService {
84  
85    /** Log facility */
86    private static final Logger logger = LoggerFactory.getLogger(CaptureNowProlongingService.class);
87  
88    private static final String CFG_KEY_INITIAL_TIME = "initial-time";
89    private static final String CFG_KEY_PROLONGING_TIME = "prolonging-time";
90  
91    private static final String JOB_NAME = "mh-capture-prolonging-job";
92    private static final String JOB_GROUP = "mh-capture-prolonging-job-group";
93    private static final String TRIGGER_GROUP = "mh-capture-prolonging-trigger-group";
94    private static final String JOB_PARAM_PARENT = "parent";
95  
96    /** The initial time in millis */
97    private int initialTime = -1;
98  
99    /** The prolonging time in millis */
100   private int prolongingTime = -1;
101 
102   /** The quartz scheduler */
103   private org.quartz.Scheduler quartz;
104 
105   /** The scheduler service */
106   private SchedulerService schedulerService;
107 
108   /** The security service */
109   private SecurityService securityService;
110 
111   /** The service registry */
112   private ServiceRegistry serviceRegistry;
113 
114   /** The organization directory service */
115   private OrganizationDirectoryService orgDirectoryService;
116 
117   /** The workspace */
118   private Workspace workspace;
119 
120   /** The bundle context for this osgi component */
121   private ComponentContext componentContext;
122 
123   /** Sets the scheduler service */
124   @Reference
125   public void setSchedulerService(SchedulerService schedulerService) {
126     this.schedulerService = schedulerService;
127   }
128 
129   /** Sets the security service */
130   @Reference
131   public void setSecurityService(SecurityService securityService) {
132     this.securityService = securityService;
133   }
134 
135   /** Sets the service registry */
136   @Reference
137   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
138     this.serviceRegistry = serviceRegistry;
139   }
140 
141   /** Sets the organization directory service */
142   @Reference
143   public void setOrgDirectoryService(OrganizationDirectoryService orgDirectoryService) {
144     this.orgDirectoryService = orgDirectoryService;
145   }
146 
147   /** Sets the workspace */
148   @Reference
149   public void setWorkspace(Workspace workspace) {
150     this.workspace = workspace;
151   }
152 
153   /**
154    * Activates the component
155    *
156    * @param cc
157    *          the component's context
158    */
159   @Activate
160   public void activate(ComponentContext cc) {
161     componentContext = cc;
162     try {
163       quartz = new StdSchedulerFactory().getScheduler();
164       quartz.start();
165       // create and set the job. To actually run it call schedule(..)
166       final JobDetail job = new JobDetail(JOB_NAME, JOB_GROUP, Runner.class);
167       job.setDurability(true);
168       job.setVolatility(true);
169       job.getJobDataMap().put(JOB_PARAM_PARENT, this);
170       quartz.addJob(job, true);
171     } catch (org.quartz.SchedulerException e) {
172       throw new RuntimeException(e);
173     }
174   }
175 
176   /**
177    * Deactivates the component
178    */
179   @Deactivate
180   public void deactivate(ComponentContext cc) {
181     componentContext = null;
182     shutdown();
183   }
184 
185   @Override
186   public void updated(Dictionary<String, ?> properties) throws ConfigurationException {
187     // Read configuration for the default initial duration
188     try {
189       initialTime = Integer.parseInt(StringUtils.defaultIfBlank((String) properties.get(CFG_KEY_INITIAL_TIME), "300"));
190     } catch (NumberFormatException e) {
191       throw new ConfigurationException(CFG_KEY_INITIAL_TIME, "Not an integer", e);
192     }
193     initialTime = Math.max(initialTime, 90) * 1000;
194 
195     // Read configuration for the prolonging time
196     try {
197       prolongingTime = Integer.parseInt(
198               StringUtils.defaultIfBlank((String) properties.get(CFG_KEY_PROLONGING_TIME), "300"));
199     } catch (NumberFormatException e) {
200       throw new ConfigurationException(CFG_KEY_PROLONGING_TIME, "Not an integer", e);
201     }
202     prolongingTime = Math.max(prolongingTime, 90) * 1000;
203   }
204 
205   /**
206    * Set the schedule and start or restart the scheduler.
207    */
208   public void schedule(String agentId) throws org.quartz.SchedulerException {
209     logger.debug("Capture prolonging job for agent '{}' is run every minute.", agentId);
210     final Trigger trigger = TriggerUtils.makeMinutelyTrigger();
211     trigger.setStartTime(DateTime.now().plusMinutes(1).toDate());
212     trigger.setName(agentId);
213     trigger.setGroup(TRIGGER_GROUP);
214     trigger.setJobName(JOB_NAME);
215     trigger.setJobGroup(JOB_GROUP);
216     if (quartz.getTrigger(agentId, TRIGGER_GROUP) == null) {
217       quartz.scheduleJob(trigger);
218     } else {
219       quartz.rescheduleJob(agentId, TRIGGER_GROUP, trigger);
220     }
221   }
222 
223   public void stop(String agentId) {
224     try {
225       quartz.unscheduleJob(agentId, TRIGGER_GROUP);
226       logger.info("Stopped prolonging capture for agent '{}'", agentId);
227     } catch (Exception e) {
228       logger.error("Error stopping Quartz job for agent '{}'", agentId, e);
229     }
230   }
231 
232   /** Shutdown the scheduler. */
233   public void shutdown() {
234     try {
235       quartz.shutdown();
236     } catch (org.quartz.SchedulerException ignore) {
237     }
238   }
239 
240   // just to make sure Quartz is being shut down...
241   @Override
242   protected void finalize() throws Throwable {
243     super.finalize();
244     shutdown();
245   }
246 
247   /**
248    * Returns the initial time duration (in milliseconds) of a recording started by the CaptureNow service
249    *
250    * @return the initial time
251    */
252   public int getInitialTime() {
253     return initialTime;
254   }
255 
256   /**
257    * Returns the time duration (in milliseconds) a recording is prolonged by the prolonging job.
258    *
259    * @return the prolonging time
260    */
261   public int getProlongingTime() {
262     return prolongingTime;
263   }
264 
265   public SecurityService getSecurityService() {
266     return securityService;
267   }
268 
269   public ComponentContext getComponentContext() {
270     return componentContext;
271   }
272 
273   public ServiceRegistry getServiceRegistry() {
274     return serviceRegistry;
275   }
276 
277   public OrganizationDirectoryService getOrgDirectoryService() {
278     return orgDirectoryService;
279   }
280 
281   public Workspace getWorkspace() {
282     return workspace;
283   }
284 
285   // --
286 
287   /** Quartz work horse. */
288   public static class Runner implements Job {
289 
290     @Override
291     public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
292       logger.debug("Starting ad-hoc prolonging job for agent '{}'", jobExecutionContext.getTrigger().getName());
293       try {
294         execute((CaptureNowProlongingService) jobExecutionContext.getJobDetail().getJobDataMap().get(JOB_PARAM_PARENT),
295                 jobExecutionContext.getTrigger().getName());
296       } catch (Exception e) {
297         throw new JobExecutionException("An error occurred while prolonging ad-hoc recordings", e);
298       }
299       logger.debug("Finished ad-hoc prolonging job for agent '{}'", jobExecutionContext.getTrigger().getName());
300     }
301 
302     private void execute(final CaptureNowProlongingService prolongingService, final String agentId) {
303       for (Organization organization : prolongingService.getOrgDirectoryService().getOrganizations()) {
304         User user = SecurityUtil.createSystemUser(prolongingService.getComponentContext(), organization);
305         SecurityUtil.runAs(prolongingService.getSecurityService(), organization, user, () -> {
306           try {
307             MediaPackage mp = prolongingService.getCurrentRecording(agentId);
308             Optional<DublinCoreCatalog> dublinCore = DublinCoreUtil.loadEpisodeDublinCore(
309                 prolongingService.getWorkspace(),
310                 mp);
311             if (dublinCore.isPresent()
312                     && EncodingSchemeUtils.decodeMandatoryPeriod(dublinCore.get().getFirst(PROPERTY_TEMPORAL))
313                             .getEnd().before(DateTime.now().plusSeconds(90).toDate())) {
314               prolong(prolongingService, mp, dublinCore.get(), agentId);
315             } else {
316               logger.debug("Wait another minute before extending the ad-hoc recording for agent '{}'", agentId);
317             }
318           } catch (NotFoundException e) {
319             logger.info("Unable to extend the ad-hoc recording for agent '{}': No ad-hoc recording found", agentId);
320           } catch (Exception e) {
321             logger.error("Error extending the ad-hoc recording for agent '{}'", agentId, e);
322           }
323         });
324       }
325     }
326 
327     private void prolong(final CaptureNowProlongingService prolongingService, final MediaPackage event,
328             final DublinCoreCatalog dublinCore, final String agentId)
329             throws NotFoundException, ServiceRegistryException {
330       try {
331         logger.info("Extending ad-hoc recording for agent '{}'", agentId);
332         prolongingService.prolongEvent(event, dublinCore, agentId);
333       } catch (UnauthorizedException e) {
334         logger.error("Error extending the ad-hoc recording for agent '{}': Permission denied", agentId);
335       } catch (NotFoundException e) {
336         logger.warn("Error extending the ad-hoc recording for agent '{}': No ad-hoc recording found", agentId);
337       } catch (Exception e) {
338         logger.error("Error extending the ad-hoc recording for agent '{}'", agentId, e);
339       }
340     }
341 
342   }
343 
344   /**
345    * Returns the current event for the given capture agent.
346    *
347    * @param agentId
348    *          the capture agent
349    * @return the recording
350    * @throws NotFoundException
351    *           if the there is no current recording
352    * @throws UnauthorizedException
353    *           if the event cannot be read due to a lack of access rights
354    * @throws SchedulerException
355    *           if accessing the scheduling database fails
356    */
357   public MediaPackage getCurrentRecording(String agentId)
358           throws NotFoundException, UnauthorizedException, SchedulerException {
359     Optional<MediaPackage> current = schedulerService.getCurrentRecording(agentId);
360     if (current.isEmpty()) {
361       logger.warn("Unable to load the current recording for agent '{}': no recording found", agentId);
362       throw new NotFoundException("No current recording found for agent '" + agentId + "'");
363     }
364     return current.get();
365   }
366 
367   /**
368    * Extends the current recording.
369    *
370    * @param event
371    *          the recording's media package
372    * @param dublinCore
373    *          the recording's dublin core catalog
374    * @param agentId
375    *          the agent
376    * @throws UnauthorizedException
377    *           if the event cannot be updated due to a lack of access rights
378    * @throws NotFoundException
379    *           if the event cannot be found
380    * @throws SchedulerException
381    *           if updating the scheduling data fails
382    * @throws IOException
383    *           if updating the calendar to the worksapce fails
384    * @throws IllegalArgumentException
385    *           if a URI cannot be created using the arguments provided
386    */
387   public void prolongEvent(MediaPackage event, DublinCoreCatalog dublinCore, String agentId)
388           throws UnauthorizedException, NotFoundException, SchedulerException, IllegalArgumentException, IOException {
389     String eventId = event.getIdentifier().toString();
390 
391     DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(dublinCore.getFirst(DublinCore.PROPERTY_TEMPORAL));
392 
393     Date prolongedEndDate = new DateTime(period.getEnd()).plus(getProlongingTime()).toDate();
394 
395     dublinCore.set(PROPERTY_TEMPORAL,
396             EncodingSchemeUtils.encodePeriod(new DCMIPeriod(period.getStart(), prolongedEndDate), Precision.Second));
397 
398     List<MediaPackage> events = schedulerService.findConflictingEvents(agentId, period.getStart(), prolongedEndDate);
399     for (MediaPackage conflictMediaPackage : events) {
400       if (eventId.equals(conflictMediaPackage.getIdentifier().toString()))
401         continue;
402 
403       Optional<DublinCoreCatalog> conflictingDc = DublinCoreUtil.loadEpisodeDublinCore(workspace, conflictMediaPackage);
404       if (conflictingDc.isEmpty()) {
405         continue;
406       }
407 
408       Date conflictingStartDate = EncodingSchemeUtils
409               .decodeMandatoryPeriod(conflictingDc.get().getFirst(DublinCore.PROPERTY_TEMPORAL)).getStart();
410 
411       prolongedEndDate = new DateTime(conflictingStartDate).minusMinutes(1).toDate();
412 
413       dublinCore.set(PROPERTY_TEMPORAL,
414               EncodingSchemeUtils.encodePeriod(new DCMIPeriod(period.getStart(), prolongedEndDate), Precision.Second));
415 
416       logger.info(
417               "A scheduled event is preventing the current recording on agent '{}' to be further extended. Extending to one minute before the conflicting event",
418               agentId);
419       stop(agentId);
420       break;
421     }
422 
423     // Update the episode dublin core
424     Catalog[] episodeCatalogs = event.getCatalogs(MediaPackageElements.EPISODE);
425     if (episodeCatalogs.length > 0) {
426       Catalog c = episodeCatalogs[0];
427       String filename = FilenameUtils.getName(c.getURI().toString());
428       URI uri = workspace.put(event.getIdentifier().toString(), c.getIdentifier(), filename,
429               IOUtils.toInputStream(dublinCore.toXmlString(), "UTF-8"));
430       c.setURI(uri);
431       // setting the URI to a new source so the checksum will most like be invalid
432       c.setChecksum(null);
433     }
434 
435     schedulerService.updateEvent(eventId, Optional.empty(), Optional.of(prolongedEndDate), Optional.empty(),
436         Optional.empty(), Optional.of(event), Optional.empty(),
437         Optional.empty());
438   }
439 
440 }