1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.impl;
23
24 import static org.opencastproject.util.data.Option.some;
25
26 import org.opencastproject.kernel.scanner.AbstractScanner;
27 import org.opencastproject.security.api.Organization;
28 import org.opencastproject.security.api.OrganizationDirectoryService;
29 import org.opencastproject.security.api.SecurityService;
30 import org.opencastproject.security.api.UnauthorizedException;
31 import org.opencastproject.serviceregistry.api.ServiceRegistry;
32 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
33 import org.opencastproject.util.NeedleEye;
34 import org.opencastproject.workflow.api.WorkflowDatabaseException;
35 import org.opencastproject.workflow.api.WorkflowInstance;
36 import org.opencastproject.workflow.api.WorkflowService;
37
38 import org.apache.commons.lang3.BooleanUtils;
39 import org.apache.commons.lang3.StringUtils;
40 import org.osgi.service.cm.ConfigurationException;
41 import org.osgi.service.cm.ManagedService;
42 import org.osgi.service.component.ComponentContext;
43 import org.osgi.service.component.annotations.Activate;
44 import org.osgi.service.component.annotations.Component;
45 import org.osgi.service.component.annotations.Deactivate;
46 import org.osgi.service.component.annotations.Reference;
47 import org.quartz.JobDetail;
48 import org.quartz.JobExecutionContext;
49 import org.quartz.impl.StdSchedulerFactory;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 import java.util.Dictionary;
54 import java.util.concurrent.locks.ReentrantLock;
55
56 @Component(
57 immediate = true,
58 service = ManagedService.class,
59 property = {
60 "service.description=Workflow Cleanup Scanner Service"
61 }
62 )
63 public class WorkflowCleanupScanner extends AbstractWorkflowBufferScanner implements ManagedService {
64 private static final String SCANNER_NAME = "Workflow Cleanup Scanner";
65
66
67 private static final Logger logger = LoggerFactory.getLogger(WorkflowCleanupScanner.class);
68
69 private static final String JOB_NAME = "mh-workflow-cleanup-job";
70 private static final String JOB_GROUP = "mh-workflow-cleanup-job-group";
71 private static final String TRIGGER_NAME = "mh-workflow-cleanup-trigger";
72 private static final String TRIGGER_GROUP = "mh-workflow-cleanup-trigger-group";
73
74 private static final String PARAM_KEY_BUFFER_SUCCEEDED = "buffer.succeeded";
75 private static final String PARAM_KEY_BUFFER_FAILED = "buffer.failed";
76 private static final String PARAM_KEY_BUFFER_STOPPED = "buffer.stopped";
77 private static final String PARAM_KEY_BUFFER_PARENTLESS = "buffer.parentless";
78
79
80 protected static int bufferForSuccessfulJobs = -1;
81
82
83 protected static int bufferForFailedJobs = -1;
84
85
86 protected static int bufferForStoppedJobs = -1;
87
88
89 protected static int bufferForParentlessJobs = -1;
90
91
92 private static final ReentrantLock lock = new ReentrantLock();
93
94 public WorkflowCleanupScanner() {
95 try {
96 quartz = new StdSchedulerFactory().getScheduler();
97 quartz.start();
98
99 final JobDetail job = new JobDetail(getJobName(), getJobGroup(), Runner.class);
100 job.setDurability(false);
101 job.setVolatility(true);
102 job.getJobDataMap().put(JOB_PARAM_PARENT, this);
103 quartz.addJob(job, true);
104 } catch (org.quartz.SchedulerException e) {
105 throw new RuntimeException(e);
106 }
107 }
108
109 @Activate
110 @Override
111 public void activate(ComponentContext cc) {
112 super.activate(cc);
113 }
114
115 @Deactivate
116 @Override
117 public void deactivate() {
118 super.deactivate();
119 }
120
121 @Override
122 public String getJobGroup() {
123 return JOB_GROUP;
124 }
125
126 @Override
127 public String getJobName() {
128 return JOB_NAME;
129 }
130
131 @Override
132 public String getTriggerGroupName() {
133 return TRIGGER_GROUP;
134 }
135
136 @Override
137 public String getTriggerName() {
138 return TRIGGER_NAME;
139 }
140
141 @Override
142 public void updated(@SuppressWarnings("rawtypes") Dictionary properties) throws ConfigurationException {
143 boolean enabled = false;
144 String cronExpression;
145 unschedule();
146
147 if (properties != null) {
148 logger.debug("Updating configuration...");
149
150 enabled = BooleanUtils.toBoolean((String) properties.get(AbstractScanner.PARAM_KEY_ENABLED));
151 setEnabled(enabled);
152 logger.debug("enabled = {}", enabled);
153
154 cronExpression = (String) properties.get(AbstractScanner.PARAM_KEY_CRON_EXPR);
155 if (StringUtils.isBlank(cronExpression)) {
156 throw new ConfigurationException(AbstractScanner.PARAM_KEY_CRON_EXPR, "Cron expression must be valid");
157 }
158 setCronExpression(cronExpression);
159 logger.debug("cronExpression = {}", cronExpression);
160
161 try {
162 bufferForSuccessfulJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_SUCCEEDED));
163 } catch (NumberFormatException e) {
164 throw new ConfigurationException(PARAM_KEY_BUFFER_SUCCEEDED, "Buffer must be a valid integer", e);
165 }
166 logger.debug("bufferForSuccessfulJobs = {}", bufferForSuccessfulJobs);
167
168 try {
169 bufferForFailedJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_FAILED));
170 } catch (NumberFormatException e) {
171 throw new ConfigurationException(PARAM_KEY_BUFFER_FAILED, "Buffer must be a valid integer", e);
172 }
173 logger.debug("bufferForFailedJobs = {}", bufferForFailedJobs);
174
175 try {
176 bufferForStoppedJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_STOPPED));
177 } catch (NumberFormatException e) {
178 throw new ConfigurationException(PARAM_KEY_BUFFER_STOPPED, "Buffer must be a valid integer", e);
179 }
180 logger.debug("bufferForStoppedJobs = {}", bufferForStoppedJobs);
181
182 try {
183 bufferForParentlessJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_PARENTLESS));
184 } catch (NumberFormatException e) {
185 throw new ConfigurationException(PARAM_KEY_BUFFER_PARENTLESS, "Buffer must be a valid integer", e);
186 }
187 logger.debug("bufferForParentlessJobs = {}", bufferForParentlessJobs);
188 }
189
190 schedule();
191 }
192
193 @Override
194 public void scan() {
195 if (lock.isLocked()) {
196 logger.info("Skipping workflow scan since a previous scan is still active");
197 return;
198 }
199 try {
200 lock.lock();
201 if (bufferForFailedJobs > 0) {
202 try {
203 getWorkflowService().cleanupWorkflowInstances(bufferForFailedJobs,
204 WorkflowInstance.WorkflowState.FAILED);
205 } catch (WorkflowDatabaseException e) {
206 logger.error("Unable to cleanup failed jobs:", e);
207 } catch (UnauthorizedException e) {
208 logger.error("Workflow cleanup job doesn't have right to delete jobs!");
209 throw new IllegalStateException(e);
210 }
211 }
212
213 if (bufferForSuccessfulJobs > 0) {
214 try {
215 getWorkflowService().cleanupWorkflowInstances(bufferForSuccessfulJobs,
216 WorkflowInstance.WorkflowState.SUCCEEDED);
217 } catch (WorkflowDatabaseException e) {
218 logger.error("Unable to cleanup successful jobs:", e);
219 } catch (UnauthorizedException e) {
220 logger.error("Workflow cleanup job doesn't have right to delete jobs!");
221 throw new IllegalStateException(e);
222 }
223 }
224
225 if (bufferForStoppedJobs > 0) {
226 try {
227 getWorkflowService().cleanupWorkflowInstances(bufferForStoppedJobs,
228 WorkflowInstance.WorkflowState.STOPPED);
229 } catch (WorkflowDatabaseException e) {
230 logger.error("Unable to cleanup stopped jobs:", e);
231 } catch (UnauthorizedException e) {
232 logger.error("Workflow cleanup job doesn't have right to delete jobs!");
233 throw new IllegalStateException(e);
234 }
235 }
236
237 if (bufferForParentlessJobs > 0) {
238 try {
239 getServiceRegistry().removeParentlessJobs(bufferForParentlessJobs);
240 } catch (ServiceRegistryException e) {
241 logger.error("There was an error while removing parentless jobs: {}", e.getMessage());
242 }
243 }
244 } finally {
245 lock.unlock();
246 }
247 }
248
249 @Override
250 public String getScannerName() {
251 return SCANNER_NAME;
252 }
253
254
255 public static class Runner extends TypedQuartzJob<AbstractScanner> {
256 private static final NeedleEye eye = new NeedleEye();
257
258 public Runner() {
259 super(some(eye));
260 }
261
262 @Override
263 protected void execute(final AbstractScanner parameters, JobExecutionContext ctx) {
264 logger.debug("Starting " + parameters.getScannerName() + " job.");
265
266
267 for (final Organization org : parameters.getOrganizationDirectoryService().getOrganizations()) {
268
269 parameters.getAdminContextFor(org.getId()).runInContext(parameters::scan);
270 }
271
272 logger.info("Finished " + parameters.getScannerName() + " job.");
273 }
274 }
275
276 @Reference
277 @Override
278 public void bindWorkflowService(WorkflowService workflowService) {
279 super.bindWorkflowService(workflowService);
280 }
281
282 @Reference
283 @Override
284 public void bindServiceRegistry(ServiceRegistry serviceRegistry) {
285 super.bindServiceRegistry(serviceRegistry);
286 }
287
288 @Reference
289 @Override
290 public void bindOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
291 super.bindOrganizationDirectoryService(organizationDirectoryService);
292 }
293
294 @Reference
295 @Override
296 public void bindSecurityService(SecurityService securityService) {
297 super.bindSecurityService(securityService);
298 }
299
300 }