1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.kernel.scanner;
23
24 import static org.opencastproject.security.util.SecurityUtil.createSystemUser;
25
26 import org.opencastproject.security.api.Organization;
27 import org.opencastproject.security.api.OrganizationDirectoryService;
28 import org.opencastproject.security.api.SecurityService;
29 import org.opencastproject.security.util.SecurityContext;
30 import org.opencastproject.security.util.SecurityUtil;
31 import org.opencastproject.serviceregistry.api.ServiceRegistry;
32 import org.opencastproject.util.NeedleEye;
33
34 import org.osgi.service.component.ComponentContext;
35 import org.quartz.CronTrigger;
36 import org.quartz.Job;
37 import org.quartz.JobExecutionContext;
38 import org.quartz.JobExecutionException;
39 import org.quartz.Scheduler;
40 import org.quartz.SchedulerException;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import java.text.ParseException;
45 import java.util.Optional;
46 import java.util.concurrent.Callable;
47
48
49
50
51
52 public abstract class AbstractScanner {
53
54 private static final Logger logger = LoggerFactory.getLogger(AbstractScanner.class);
55
56 public static final String JOB_PARAM_PARENT = "parent";
57
58 public static final String PARAM_KEY_ENABLED = "enabled";
59
60 public static final String PARAM_KEY_CRON_EXPR = "cron-expression";
61
62
63 protected Scheduler quartz;
64
65
66 private boolean enabled = false;
67
68
69 private String cronExpression = "0 0 2 * * ?";
70
71
72 private ServiceRegistry serviceRegistry;
73
74
75 private SecurityService securityService;
76
77
78 private OrganizationDirectoryService directoryService;
79
80 private String systemUserName;
81
82
83 public abstract String getJobGroup();
84
85
86 public abstract String getJobName();
87
88
89 public abstract String getTriggerGroupName();
90
91
92 public abstract String getTriggerName();
93
94
95 public abstract String getScannerName();
96
97 public abstract void scan();
98
99
100 public String getCronExpression() {
101 return cronExpression;
102 }
103
104 public void setCronExpression(String cronExpression) {
105 this.cronExpression = cronExpression;
106 }
107
108
109
110
111 protected Scheduler getQuartz() {
112 return quartz;
113 }
114
115 protected void setQuartz(Scheduler quartz) {
116 this.quartz = quartz;
117 }
118
119
120 public boolean isEnabled() {
121 return enabled;
122 }
123
124 public void setEnabled(boolean enabled) {
125 this.enabled = enabled;
126 }
127
128
129
130
131 public void schedule() {
132 if (!isEnabled()) {
133 logger.info(getScannerName() + " is disabled");
134 return;
135 }
136
137 if (quartz == null) {
138 logger.warn("No quartz scheduler available to schedule scanner.");
139 return;
140 }
141
142 logger.info("Schedule " + getScannerName() + " as a cron job ({})", getCronExpression());
143 try {
144 final CronTrigger trigger = new CronTrigger();
145 trigger.setCronExpression(getCronExpression());
146 trigger.setName(getTriggerName());
147 trigger.setGroup(getTriggerGroupName());
148 trigger.setJobName(getJobName());
149 trigger.setJobGroup(getJobGroup());
150 if (getQuartz().getTriggersOfJob(getJobName(), getJobGroup()).length == 0) {
151 getQuartz().scheduleJob(trigger);
152 } else {
153 getQuartz().rescheduleJob(getTriggerName(), getTriggerGroupName(), trigger);
154 }
155 } catch (ParseException e) {
156 logger.error("Error scheduling " + getScannerName() + ", the cron expression '{}' could not be parsed: {}",
157 getCronExpression(), e.getMessage());
158 } catch (Exception e) {
159 logger.error("Error scheduling " + getScannerName(), e);
160 }
161 }
162
163
164
165
166 public void unschedule() {
167 try {
168 if (quartz != null) {
169 quartz.unscheduleJob(getTriggerName(), getTriggerGroupName());
170 }
171 } catch (SchedulerException e) {
172 logger.error("Error unscheduling " + getScannerName(), e);
173 }
174 }
175
176
177 protected void bindOrganizationDirectoryService(OrganizationDirectoryService directoryService) {
178 this.directoryService = directoryService;
179 }
180
181
182 protected void bindSecurityService(SecurityService securityService) {
183 this.securityService = securityService;
184 }
185
186 protected void bindServiceRegistry(ServiceRegistry serviceRegistry) {
187 this.serviceRegistry = serviceRegistry;
188 }
189
190
191 public OrganizationDirectoryService getOrganizationDirectoryService() {
192 return directoryService;
193 }
194
195
196
197
198
199
200 public SecurityContext getAdminContextFor(String orgId) {
201 try {
202 final Organization org = directoryService.getOrganization(orgId);
203 return new SecurityContext(securityService, org, createSystemUser(systemUserName, org));
204 } catch (Exception e) {
205 throw new Error(e);
206 }
207 }
208
209
210 public ServiceRegistry getServiceRegistry() {
211 return this.serviceRegistry;
212 }
213
214
215 public String getSystemUserName() {
216 return systemUserName;
217 }
218
219
220 protected void activate(ComponentContext cc) {
221 systemUserName = cc.getBundleContext().getProperty(SecurityUtil.PROPERTY_KEY_SYS_USER);
222 }
223
224
225 public void deactivate() {
226 shutdown();
227 }
228
229
230 public void shutdown() {
231 try {
232 if (quartz != null) {
233 quartz.shutdown();
234 }
235 } catch (org.quartz.SchedulerException e) {
236 logger.debug("Exception while shutting down quartz scheduler this will be ignored:", e);
237 }
238 }
239
240
241 public void trigger() {
242 try {
243 quartz.triggerJobWithVolatileTrigger(getJobName(), getJobGroup());
244 } catch (Exception e) {
245 logger.error("Error triggering Quartz job", e);
246 }
247 }
248
249
250 @Override
251 protected void finalize() throws Throwable {
252 super.finalize();
253 shutdown();
254 }
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282 public abstract static class TypedQuartzJob<A> implements Job {
283 private final Optional<NeedleEye> allowParallel;
284
285
286
287
288
289
290 protected TypedQuartzJob(Optional<NeedleEye> allowParallel) {
291 this.allowParallel = allowParallel;
292 }
293
294 @Override
295 public final void execute(final JobExecutionContext ctx) throws JobExecutionException {
296 Callable<Integer> job = executeF(ctx);
297 if (allowParallel.isPresent()) {
298 allowParallel.get().apply(job);
299 } else {
300 try {
301 job.call();
302 } catch (Exception e) {
303 throw new JobExecutionException("An error occurred while executing job", e);
304 }
305 }
306 }
307
308
309 protected abstract void execute(A parameters, JobExecutionContext ctx);
310
311 private Callable<Integer> executeF(final JobExecutionContext ctx) {
312 return () -> {
313 try {
314 execute((A) ctx.getJobDetail().getJobDataMap().get(JOB_PARAM_PARENT), ctx);
315 return 0;
316 } catch (Exception e) {
317 logger.error("An error occurred while harvesting schedule", e);
318 throw new JobExecutionException("An error occurred while harvesting schedule", e);
319 }
320 };
321 }
322 }
323 }