View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.kernel.scanner;
23  
24  import static org.opencastproject.security.util.SecurityUtil.createSystemUser;
25  
26  import org.opencastproject.security.api.Organization;
27  import org.opencastproject.security.api.OrganizationDirectoryService;
28  import org.opencastproject.security.api.SecurityService;
29  import org.opencastproject.security.util.SecurityContext;
30  import org.opencastproject.security.util.SecurityUtil;
31  import org.opencastproject.serviceregistry.api.ServiceRegistry;
32  import org.opencastproject.util.NeedleEye;
33  
34  import org.osgi.service.component.ComponentContext;
35  import org.quartz.CronTrigger;
36  import org.quartz.Job;
37  import org.quartz.JobExecutionContext;
38  import org.quartz.JobExecutionException;
39  import org.quartz.Scheduler;
40  import org.quartz.SchedulerException;
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  import java.text.ParseException;
45  import java.util.Optional;
46  import java.util.concurrent.Callable;
47  
48  /**
49   * This class is designed to provide a template for sub classes that will scan
50   * their data and respond accordingly.
51   */
52  public abstract class AbstractScanner {
53    /** The logging facility */
54    private static final Logger logger = LoggerFactory.getLogger(AbstractScanner.class);
55  
56    public static final String JOB_PARAM_PARENT = "parent";
57    /** The key whose value will be used to determine if a scanner is enabled or disabled.*/
58    public static final String PARAM_KEY_ENABLED = "enabled";
59    /** The key whose value will be a cron expression to determine how often the Scanner scans.*/
60    public static final String PARAM_KEY_CRON_EXPR = "cron-expression";
61  
62    /** The quartz scheduler */
63    protected Scheduler quartz;
64  
65    /** Is the scanner enabled? */
66    private boolean enabled = false;
67  
68    /** Cron schedule expression */
69    private String cronExpression = "0 0 2 * * ?"; // every day at 2:00 am
70  
71    /** Reference to the service registry */
72    private ServiceRegistry serviceRegistry;
73  
74    /** Reference to the security service */
75    private SecurityService securityService;
76  
77    /** Reference to the organization directory service */
78    private OrganizationDirectoryService directoryService;
79  
80    private String systemUserName;
81  
82    /** The name of the job group to schedule this quartz job under. */
83    public abstract String getJobGroup();
84  
85    /** The name of the job */
86    public abstract String getJobName();
87  
88    /** The name of the group to store the quartz triggers under. */
89    public abstract String getTriggerGroupName();
90  
91    /** The name of the triggers to use for the quartz jobs. */
92    public abstract String getTriggerName();
93  
94    /** The name of the scanner to use in log files. */
95    public abstract String getScannerName();
96  
97    public abstract void scan();
98  
99    /** Returns a cron style statement that defines how often this scanner will run. */
100   public String getCronExpression() {
101     return cronExpression;
102   }
103 
104   public void setCronExpression(String cronExpression) {
105     this.cronExpression = cronExpression;
106   }
107 
108   /**
109    * @return Returns the current quartz scheduler used to schedule scanning jobs.
110    */
111   protected Scheduler getQuartz() {
112     return quartz;
113   }
114 
115   protected void setQuartz(Scheduler quartz) {
116     this.quartz = quartz;
117   }
118 
119   /** True if this scanner should be enabled. */
120   public boolean isEnabled() {
121     return enabled;
122   }
123 
124   public void setEnabled(boolean enabled) {
125     this.enabled = enabled;
126   }
127 
128   /**
129    * Schedule the scanning job to execute according to the cron schedule.
130    */
131   public void schedule() {
132     if (!isEnabled()) {
133       logger.info(getScannerName() + " is disabled");
134       return;
135     }
136 
137     if (quartz == null) {
138       logger.warn("No quartz scheduler available to schedule scanner.");
139       return;
140     }
141 
142     logger.info("Schedule " + getScannerName() + " as a cron job ({})", getCronExpression());
143     try {
144       final CronTrigger trigger = new CronTrigger();
145       trigger.setCronExpression(getCronExpression());
146       trigger.setName(getTriggerName());
147       trigger.setGroup(getTriggerGroupName());
148       trigger.setJobName(getJobName());
149       trigger.setJobGroup(getJobGroup());
150       if (getQuartz().getTriggersOfJob(getJobName(), getJobGroup()).length == 0) {
151         getQuartz().scheduleJob(trigger);
152       } else {
153         getQuartz().rescheduleJob(getTriggerName(), getTriggerGroupName(), trigger);
154       }
155     } catch (ParseException e) {
156       logger.error("Error scheduling " + getScannerName() + ", the cron expression '{}' could not be parsed: {}",
157               getCronExpression(), e.getMessage());
158     } catch (Exception e) {
159       logger.error("Error scheduling " + getScannerName(), e);
160     }
161   }
162 
163   /**
164    * Unschedule the scanning job.
165    */
166   public void unschedule() {
167     try {
168       if (quartz != null) {
169         quartz.unscheduleJob(getTriggerName(), getTriggerGroupName());
170       }
171     } catch (SchedulerException e) {
172       logger.error("Error unscheduling " + getScannerName(), e);
173     }
174   }
175 
176   /** OSGi callback to set organization directory service */
177   protected void bindOrganizationDirectoryService(OrganizationDirectoryService directoryService) {
178     this.directoryService = directoryService;
179   }
180 
181   /** OSGi callback to set security service */
182   protected void bindSecurityService(SecurityService securityService) {
183     this.securityService = securityService;
184   }
185 
186   protected void bindServiceRegistry(ServiceRegistry serviceRegistry) {
187     this.serviceRegistry = serviceRegistry;
188   }
189 
190   /** Get an organization directory service */
191   public OrganizationDirectoryService getOrganizationDirectoryService() {
192     return directoryService;
193   }
194 
195   /**
196    * Get a system administrator context for the given organization id.
197    * @param orgId The organization's id.
198    * @return A SecurityContext for the admin.
199    */
200   public SecurityContext getAdminContextFor(String orgId) {
201     try {
202       final Organization org = directoryService.getOrganization(orgId);
203       return new SecurityContext(securityService, org, createSystemUser(systemUserName, org));
204     } catch (Exception e) {
205       throw new Error(e);
206     }
207   }
208 
209   /** Get a service registry */
210   public ServiceRegistry getServiceRegistry() {
211     return this.serviceRegistry;
212   }
213 
214   /** The user name to run this scanner job under. */
215   public String getSystemUserName() {
216     return systemUserName;
217   }
218 
219   /** OSGi component activate callback */
220   protected void activate(ComponentContext cc) {
221     systemUserName = cc.getBundleContext().getProperty(SecurityUtil.PROPERTY_KEY_SYS_USER);
222   }
223 
224   /** OSGi deactivate component callback. */
225   public void deactivate() {
226     shutdown();
227   }
228 
229   /** Shutdown the scheduler. */
230   public void shutdown() {
231     try {
232       if (quartz != null) {
233         quartz.shutdown();
234       }
235     } catch (org.quartz.SchedulerException e) {
236       logger.debug("Exception while shutting down quartz scheduler this will be ignored:", e);
237     }
238   }
239 
240   /** Trigger the scheduler once independent of it's actual schedule. */
241   public void trigger() {
242     try {
243       quartz.triggerJobWithVolatileTrigger(getJobName(), getJobGroup());
244     } catch (Exception e) {
245       logger.error("Error triggering Quartz job", e);
246     }
247   }
248 
249   /** Just to make sure Quartz is being shut down... */
250   @Override
251   protected void finalize() throws Throwable {
252     super.finalize();
253     shutdown();
254   }
255 
256 
257   /**
258    * Please remember that derived classes need a no-arg constructor in order to work with Quartz. Sample usage:
259    *
260    * <pre>
261    * public class Runner extends TypedQuartzJob&lt;Scheduler&gt; {
262    *   protected abstract void execute(Scheduler parameters, JobExecutionContext ctx) {
263    *     // type safe parameter access
264    *     parameters.getConfig();
265    *   }
266    * }
267    *
268    * public class Scheduler {
269    *   ...
270    *   // create the job
271    *   final JobDetail job = new JobDetail(JOB_NAME, JOB_GROUP, Runner.class);
272    *   // set the scheduler as parameter source
273    *   job.getJobDataMap().put(JOB_PARAM_PARENT, this);
274    *   // add to Quartz scheduler
275    *   quartz.addJob(job, true);
276    *   ...
277    *   // provide a config string
278    *   public String getConfig() {...}
279    * }
280    * </pre>
281    */
282   public abstract static class TypedQuartzJob<A> implements Job {
283     private final Optional<NeedleEye> allowParallel;
284 
285     /**
286      * @param allowParallel
287      *          Pass a needle eye if only one job may be run at a time. Make the needle eye static to the inheriting
288      *          class.
289      */
290     protected TypedQuartzJob(Optional<NeedleEye> allowParallel) {
291       this.allowParallel = allowParallel;
292     }
293 
294     @Override
295     public final void execute(final JobExecutionContext ctx) throws JobExecutionException {
296       Callable<Integer> job = executeF(ctx);
297       if (allowParallel.isPresent()) {
298         allowParallel.get().apply(job);
299       } else {
300         try {
301           job.call();
302         } catch (Exception e) {
303           throw new JobExecutionException("An error occurred while executing job", e);
304         }
305       }
306     }
307 
308     /** Typesafe replacement for {@link #execute(org.quartz.JobExecutionContext)}. */
309     protected abstract void execute(A parameters, JobExecutionContext ctx);
310 
311     private Callable<Integer> executeF(final JobExecutionContext ctx) {
312       return () -> {
313         try {
314           execute((A) ctx.getJobDetail().getJobDataMap().get(JOB_PARAM_PARENT), ctx);
315           return 0;
316         } catch (Exception e) {
317           logger.error("An error occurred while harvesting schedule", e);
318           throw new JobExecutionException("An error occurred while harvesting schedule", e);
319         }
320       };
321     }
322   }
323 }