View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.scheduler.impl;
22  
23  import static org.opencastproject.metadata.dublincore.DublinCore.PROPERTY_TEMPORAL;
24  
25  import org.opencastproject.mediapackage.Catalog;
26  import org.opencastproject.mediapackage.MediaPackage;
27  import org.opencastproject.mediapackage.MediaPackageElements;
28  import org.opencastproject.metadata.dublincore.DCMIPeriod;
29  import org.opencastproject.metadata.dublincore.DublinCore;
30  import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
31  import org.opencastproject.metadata.dublincore.DublinCoreUtil;
32  import org.opencastproject.metadata.dublincore.EncodingSchemeUtils;
33  import org.opencastproject.metadata.dublincore.Precision;
34  import org.opencastproject.scheduler.api.SchedulerException;
35  import org.opencastproject.scheduler.api.SchedulerService;
36  import org.opencastproject.security.api.Organization;
37  import org.opencastproject.security.api.OrganizationDirectoryService;
38  import org.opencastproject.security.api.SecurityService;
39  import org.opencastproject.security.api.UnauthorizedException;
40  import org.opencastproject.security.api.User;
41  import org.opencastproject.security.util.SecurityUtil;
42  import org.opencastproject.serviceregistry.api.ServiceRegistry;
43  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
44  import org.opencastproject.util.NotFoundException;
45  import org.opencastproject.workspace.api.Workspace;
46  
47  import com.entwinemedia.fn.data.Opt;
48  
49  import org.apache.commons.io.FilenameUtils;
50  import org.apache.commons.io.IOUtils;
51  import org.apache.commons.lang3.StringUtils;
52  import org.joda.time.DateTime;
53  import org.osgi.service.cm.ConfigurationException;
54  import org.osgi.service.cm.ManagedService;
55  import org.osgi.service.component.ComponentContext;
56  import org.osgi.service.component.annotations.Activate;
57  import org.osgi.service.component.annotations.Component;
58  import org.osgi.service.component.annotations.Deactivate;
59  import org.osgi.service.component.annotations.Reference;
60  import org.quartz.Job;
61  import org.quartz.JobDetail;
62  import org.quartz.JobExecutionContext;
63  import org.quartz.JobExecutionException;
64  import org.quartz.Trigger;
65  import org.quartz.TriggerUtils;
66  import org.quartz.impl.StdSchedulerFactory;
67  import org.slf4j.Logger;
68  import org.slf4j.LoggerFactory;
69  
70  import java.io.IOException;
71  import java.net.URI;
72  import java.util.Date;
73  import java.util.Dictionary;
74  import java.util.List;
75  import java.util.Map;
76  import java.util.Optional;
77  import java.util.Set;
78  
79  /** Prolong immediate recordings before reaching the end, as long as there are no conflicts */
80  @Component(
81      immediate = true,
82      service = { ManagedService.class,CaptureNowProlongingService.class },
83      property = {
84          "service.description=Capture Prolonging Service"
85      }
86  )
87  public class CaptureNowProlongingService implements ManagedService {
88  
89    /** Log facility */
90    private static final Logger logger = LoggerFactory.getLogger(CaptureNowProlongingService.class);
91  
92    private static final String CFG_KEY_INITIAL_TIME = "initial-time";
93    private static final String CFG_KEY_PROLONGING_TIME = "prolonging-time";
94  
95    private static final String JOB_NAME = "mh-capture-prolonging-job";
96    private static final String JOB_GROUP = "mh-capture-prolonging-job-group";
97    private static final String TRIGGER_GROUP = "mh-capture-prolonging-trigger-group";
98    private static final String JOB_PARAM_PARENT = "parent";
99  
100   /** The initial time in millis */
101   private int initialTime = -1;
102 
103   /** The prolonging time in millis */
104   private int prolongingTime = -1;
105 
106   /** The quartz scheduler */
107   private org.quartz.Scheduler quartz;
108 
109   /** The scheduler service */
110   private SchedulerService schedulerService;
111 
112   /** The security service */
113   private SecurityService securityService;
114 
115   /** The service registry */
116   private ServiceRegistry serviceRegistry;
117 
118   /** The organization directory service */
119   private OrganizationDirectoryService orgDirectoryService;
120 
121   /** The workspace */
122   private Workspace workspace;
123 
124   /** The bundle context for this osgi component */
125   private ComponentContext componentContext;
126 
127   /** Sets the scheduler service */
128   @Reference
129   public void setSchedulerService(SchedulerService schedulerService) {
130     this.schedulerService = schedulerService;
131   }
132 
133   /** Sets the security service */
134   @Reference
135   public void setSecurityService(SecurityService securityService) {
136     this.securityService = securityService;
137   }
138 
139   /** Sets the service registry */
140   @Reference
141   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
142     this.serviceRegistry = serviceRegistry;
143   }
144 
145   /** Sets the organization directory service */
146   @Reference
147   public void setOrgDirectoryService(OrganizationDirectoryService orgDirectoryService) {
148     this.orgDirectoryService = orgDirectoryService;
149   }
150 
151   /** Sets the workspace */
152   @Reference
153   public void setWorkspace(Workspace workspace) {
154     this.workspace = workspace;
155   }
156 
157   /**
158    * Activates the component
159    *
160    * @param cc
161    *          the component's context
162    */
163   @Activate
164   public void activate(ComponentContext cc) {
165     componentContext = cc;
166     try {
167       quartz = new StdSchedulerFactory().getScheduler();
168       quartz.start();
169       // create and set the job. To actually run it call schedule(..)
170       final JobDetail job = new JobDetail(JOB_NAME, JOB_GROUP, Runner.class);
171       job.setDurability(true);
172       job.setVolatility(true);
173       job.getJobDataMap().put(JOB_PARAM_PARENT, this);
174       quartz.addJob(job, true);
175     } catch (org.quartz.SchedulerException e) {
176       throw new RuntimeException(e);
177     }
178   }
179 
180   /**
181    * Deactivates the component
182    */
183   @Deactivate
184   public void deactivate(ComponentContext cc) {
185     componentContext = null;
186     shutdown();
187   }
188 
189   @Override
190   public void updated(Dictionary<String, ?> properties) throws ConfigurationException {
191     // Read configuration for the default initial duration
192     try {
193       initialTime = Integer.parseInt(StringUtils.defaultIfBlank((String) properties.get(CFG_KEY_INITIAL_TIME), "300"));
194     } catch (NumberFormatException e) {
195       throw new ConfigurationException(CFG_KEY_INITIAL_TIME, "Not an integer", e);
196     }
197     initialTime = Math.max(initialTime, 90) * 1000;
198 
199     // Read configuration for the prolonging time
200     try {
201       prolongingTime = Integer.parseInt(
202               StringUtils.defaultIfBlank((String) properties.get(CFG_KEY_PROLONGING_TIME), "300"));
203     } catch (NumberFormatException e) {
204       throw new ConfigurationException(CFG_KEY_PROLONGING_TIME, "Not an integer", e);
205     }
206     prolongingTime = Math.max(prolongingTime, 90) * 1000;
207   }
208 
209   /**
210    * Set the schedule and start or restart the scheduler.
211    */
212   public void schedule(String agentId) throws org.quartz.SchedulerException {
213     logger.debug("Capture prolonging job for agent '{}' is run every minute.", agentId);
214     final Trigger trigger = TriggerUtils.makeMinutelyTrigger();
215     trigger.setStartTime(DateTime.now().plusMinutes(1).toDate());
216     trigger.setName(agentId);
217     trigger.setGroup(TRIGGER_GROUP);
218     trigger.setJobName(JOB_NAME);
219     trigger.setJobGroup(JOB_GROUP);
220     if (quartz.getTrigger(agentId, TRIGGER_GROUP) == null) {
221       quartz.scheduleJob(trigger);
222     } else {
223       quartz.rescheduleJob(agentId, TRIGGER_GROUP, trigger);
224     }
225   }
226 
227   public void stop(String agentId) {
228     try {
229       quartz.unscheduleJob(agentId, TRIGGER_GROUP);
230       logger.info("Stopped prolonging capture for agent '{}'", agentId);
231     } catch (Exception e) {
232       logger.error("Error stopping Quartz job for agent '{}'", agentId, e);
233     }
234   }
235 
236   /** Shutdown the scheduler. */
237   public void shutdown() {
238     try {
239       quartz.shutdown();
240     } catch (org.quartz.SchedulerException ignore) {
241     }
242   }
243 
244   // just to make sure Quartz is being shut down...
245   @Override
246   protected void finalize() throws Throwable {
247     super.finalize();
248     shutdown();
249   }
250 
251   /**
252    * Returns the initial time duration (in milliseconds) of a recording started by the CaptureNow service
253    *
254    * @return the initial time
255    */
256   public int getInitialTime() {
257     return initialTime;
258   }
259 
260   /**
261    * Returns the time duration (in milliseconds) a recording is prolonged by the prolonging job.
262    *
263    * @return the prolonging time
264    */
265   public int getProlongingTime() {
266     return prolongingTime;
267   }
268 
269   public SecurityService getSecurityService() {
270     return securityService;
271   }
272 
273   public ComponentContext getComponentContext() {
274     return componentContext;
275   }
276 
277   public ServiceRegistry getServiceRegistry() {
278     return serviceRegistry;
279   }
280 
281   public OrganizationDirectoryService getOrgDirectoryService() {
282     return orgDirectoryService;
283   }
284 
285   public Workspace getWorkspace() {
286     return workspace;
287   }
288 
289   // --
290 
291   /** Quartz work horse. */
292   public static class Runner implements Job {
293 
294     @Override
295     public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
296       logger.debug("Starting ad-hoc prolonging job for agent '{}'", jobExecutionContext.getTrigger().getName());
297       try {
298         execute((CaptureNowProlongingService) jobExecutionContext.getJobDetail().getJobDataMap().get(JOB_PARAM_PARENT),
299                 jobExecutionContext.getTrigger().getName());
300       } catch (Exception e) {
301         throw new JobExecutionException("An error occurred while prolonging ad-hoc recordings", e);
302       }
303       logger.debug("Finished ad-hoc prolonging job for agent '{}'", jobExecutionContext.getTrigger().getName());
304     }
305 
306     private void execute(final CaptureNowProlongingService prolongingService, final String agentId) {
307       for (Organization organization : prolongingService.getOrgDirectoryService().getOrganizations()) {
308         User user = SecurityUtil.createSystemUser(prolongingService.getComponentContext(), organization);
309         SecurityUtil.runAs(prolongingService.getSecurityService(), organization, user, () -> {
310           try {
311             MediaPackage mp = prolongingService.getCurrentRecording(agentId);
312             Optional<DublinCoreCatalog> dublinCore = DublinCoreUtil.loadEpisodeDublinCore(
313                 prolongingService.getWorkspace(),
314                 mp);
315             if (dublinCore.isPresent()
316                     && EncodingSchemeUtils.decodeMandatoryPeriod(dublinCore.get().getFirst(PROPERTY_TEMPORAL))
317                             .getEnd().before(DateTime.now().plusSeconds(90).toDate())) {
318               prolong(prolongingService, mp, dublinCore.get(), agentId);
319             } else {
320               logger.debug("Wait another minute before extending the ad-hoc recording for agent '{}'", agentId);
321             }
322           } catch (NotFoundException e) {
323             logger.info("Unable to extend the ad-hoc recording for agent '{}': No ad-hoc recording found", agentId);
324           } catch (Exception e) {
325             logger.error("Error extending the ad-hoc recording for agent '{}'", agentId, e);
326           }
327         });
328       }
329     }
330 
331     private void prolong(final CaptureNowProlongingService prolongingService, final MediaPackage event,
332             final DublinCoreCatalog dublinCore, final String agentId)
333             throws NotFoundException, ServiceRegistryException {
334       try {
335         logger.info("Extending ad-hoc recording for agent '{}'", agentId);
336         prolongingService.prolongEvent(event, dublinCore, agentId);
337       } catch (UnauthorizedException e) {
338         logger.error("Error extending the ad-hoc recording for agent '{}': Permission denied", agentId);
339       } catch (NotFoundException e) {
340         logger.warn("Error extending the ad-hoc recording for agent '{}': No ad-hoc recording found", agentId);
341       } catch (Exception e) {
342         logger.error("Error extending the ad-hoc recording for agent '{}'", agentId, e);
343       }
344     }
345 
346   }
347 
348   /**
349    * Returns the current event for the given capture agent.
350    *
351    * @param agentId
352    *          the capture agent
353    * @return the recording
354    * @throws NotFoundException
355    *           if the there is no current recording
356    * @throws UnauthorizedException
357    *           if the event cannot be read due to a lack of access rights
358    * @throws SchedulerException
359    *           if accessing the scheduling database fails
360    */
361   public MediaPackage getCurrentRecording(String agentId)
362           throws NotFoundException, UnauthorizedException, SchedulerException {
363     Opt<MediaPackage> current = schedulerService.getCurrentRecording(agentId);
364     if (current.isNone()) {
365       logger.warn("Unable to load the current recording for agent '{}': no recording found", agentId);
366       throw new NotFoundException("No current recording found for agent '" + agentId + "'");
367     }
368     return current.get();
369   }
370 
371   /**
372    * Extends the current recording.
373    *
374    * @param event
375    *          the recording's media package
376    * @param dublinCore
377    *          the recording's dublin core catalog
378    * @param agentId
379    *          the agent
380    * @throws UnauthorizedException
381    *           if the event cannot be updated due to a lack of access rights
382    * @throws NotFoundException
383    *           if the event cannot be found
384    * @throws SchedulerException
385    *           if updating the scheduling data fails
386    * @throws IOException
387    *           if updating the calendar to the worksapce fails
388    * @throws IllegalArgumentException
389    *           if a URI cannot be created using the arguments provided
390    */
391   public void prolongEvent(MediaPackage event, DublinCoreCatalog dublinCore, String agentId)
392           throws UnauthorizedException, NotFoundException, SchedulerException, IllegalArgumentException, IOException {
393     String eventId = event.getIdentifier().toString();
394 
395     DCMIPeriod period = EncodingSchemeUtils.decodeMandatoryPeriod(dublinCore.getFirst(DublinCore.PROPERTY_TEMPORAL));
396 
397     Date prolongedEndDate = new DateTime(period.getEnd()).plus(getProlongingTime()).toDate();
398 
399     dublinCore.set(PROPERTY_TEMPORAL,
400             EncodingSchemeUtils.encodePeriod(new DCMIPeriod(period.getStart(), prolongedEndDate), Precision.Second));
401 
402     List<MediaPackage> events = schedulerService.findConflictingEvents(agentId, period.getStart(), prolongedEndDate);
403     for (MediaPackage conflictMediaPackage : events) {
404       if (eventId.equals(conflictMediaPackage.getIdentifier().toString()))
405         continue;
406 
407       Optional<DublinCoreCatalog> conflictingDc = DublinCoreUtil.loadEpisodeDublinCore(workspace, conflictMediaPackage);
408       if (conflictingDc.isEmpty()) {
409         continue;
410       }
411 
412       Date conflictingStartDate = EncodingSchemeUtils
413               .decodeMandatoryPeriod(conflictingDc.get().getFirst(DublinCore.PROPERTY_TEMPORAL)).getStart();
414 
415       prolongedEndDate = new DateTime(conflictingStartDate).minusMinutes(1).toDate();
416 
417       dublinCore.set(PROPERTY_TEMPORAL,
418               EncodingSchemeUtils.encodePeriod(new DCMIPeriod(period.getStart(), prolongedEndDate), Precision.Second));
419 
420       logger.info(
421               "A scheduled event is preventing the current recording on agent '{}' to be further extended. Extending to one minute before the conflicting event",
422               agentId);
423       stop(agentId);
424       break;
425     }
426 
427     // Update the episode dublin core
428     Catalog[] episodeCatalogs = event.getCatalogs(MediaPackageElements.EPISODE);
429     if (episodeCatalogs.length > 0) {
430       Catalog c = episodeCatalogs[0];
431       String filename = FilenameUtils.getName(c.getURI().toString());
432       URI uri = workspace.put(event.getIdentifier().toString(), c.getIdentifier(), filename,
433               IOUtils.toInputStream(dublinCore.toXmlString(), "UTF-8"));
434       c.setURI(uri);
435       // setting the URI to a new source so the checksum will most like be invalid
436       c.setChecksum(null);
437     }
438 
439     schedulerService.updateEvent(eventId, Opt.<Date> none(), Opt.some(prolongedEndDate), Opt.<String> none(),
440             Opt.<Set<String>> none(), Opt.some(event), Opt.<Map<String, String>> none(),
441             Opt.<Map<String, String>> none());
442   }
443 
444 }