1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.impl;
23
24 import static org.opencastproject.util.data.Option.some;
25
26 import org.opencastproject.kernel.scanner.AbstractScanner;
27 import org.opencastproject.security.api.Organization;
28 import org.opencastproject.security.api.OrganizationDirectoryService;
29 import org.opencastproject.security.api.SecurityService;
30 import org.opencastproject.security.api.UnauthorizedException;
31 import org.opencastproject.serviceregistry.api.ServiceRegistry;
32 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
33 import org.opencastproject.util.NeedleEye;
34 import org.opencastproject.workflow.api.WorkflowDatabaseException;
35 import org.opencastproject.workflow.api.WorkflowInstance;
36 import org.opencastproject.workflow.api.WorkflowService;
37
38 import org.apache.commons.lang3.BooleanUtils;
39 import org.apache.commons.lang3.StringUtils;
40 import org.osgi.service.cm.ConfigurationException;
41 import org.osgi.service.cm.ManagedService;
42 import org.osgi.service.component.ComponentContext;
43 import org.osgi.service.component.annotations.Activate;
44 import org.osgi.service.component.annotations.Component;
45 import org.osgi.service.component.annotations.Deactivate;
46 import org.osgi.service.component.annotations.Reference;
47 import org.quartz.JobDetail;
48 import org.quartz.JobExecutionContext;
49 import org.quartz.impl.StdSchedulerFactory;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 import java.util.Dictionary;
54 import java.util.concurrent.locks.ReentrantLock;
55
56 @Component(
57 immediate = true,
58 service = ManagedService.class,
59 property = {
60 "service.description=Workflow Cleanup Scanner Service"
61 }
62 )
63 public class WorkflowCleanupScanner extends AbstractWorkflowBufferScanner implements ManagedService {
64 private static final String SCANNER_NAME = "Workflow Cleanup Scanner";
65
66
67 private static final Logger logger = LoggerFactory.getLogger(WorkflowCleanupScanner.class);
68
69 private static final String JOB_NAME = "mh-workflow-cleanup-job";
70 private static final String JOB_GROUP = "mh-workflow-cleanup-job-group";
71 private static final String TRIGGER_NAME = "mh-workflow-cleanup-trigger";
72 private static final String TRIGGER_GROUP = "mh-workflow-cleanup-trigger-group";
73
74 private static final String PARAM_KEY_BUFFER_SUCCEEDED = "buffer.succeeded";
75 private static final String PARAM_KEY_BUFFER_FAILED = "buffer.failed";
76 private static final String PARAM_KEY_BUFFER_STOPPED = "buffer.stopped";
77 private static final String PARAM_KEY_BUFFER_PARENTLESS = "buffer.parentless";
78
79
80 protected static int bufferForSuccessfulJobs = -1;
81
82
83 protected static int bufferForFailedJobs = -1;
84
85
86 protected static int bufferForStoppedJobs = -1;
87
88
89 protected static int bufferForParentlessJobs = -1;
90
91
92 private static final ReentrantLock lock = new ReentrantLock();
93
94 public WorkflowCleanupScanner() {
95 try {
96 quartz = new StdSchedulerFactory().getScheduler();
97 quartz.start();
98
99 final JobDetail job = new JobDetail(getJobName(), getJobGroup(), Runner.class);
100 job.setDurability(false);
101 job.setVolatility(true);
102 job.getJobDataMap().put(JOB_PARAM_PARENT, this);
103 quartz.addJob(job, true);
104 } catch (org.quartz.SchedulerException e) {
105 throw new RuntimeException(e);
106 }
107 }
108
109 @Activate
110 @Override
111 public void activate(ComponentContext cc) {
112 super.activate(cc);
113 }
114
115 @Deactivate
116 @Override
117 public void deactivate() {
118 super.deactivate();
119 }
120
121 @Override
122 public String getJobGroup() {
123 return JOB_GROUP;
124 }
125
126 @Override
127 public String getJobName() {
128 return JOB_NAME;
129 }
130
131 @Override
132 public String getTriggerGroupName() {
133 return TRIGGER_GROUP;
134 }
135
136 @Override
137 public String getTriggerName() {
138 return TRIGGER_NAME;
139 }
140
141 @Override
142 public void updated(@SuppressWarnings("rawtypes") Dictionary properties) throws ConfigurationException {
143 boolean enabled = false;
144 String cronExpression;
145 unschedule();
146
147 if (properties != null) {
148 logger.debug("Updating configuration...");
149
150 enabled = BooleanUtils.toBoolean((String) properties.get(AbstractScanner.PARAM_KEY_ENABLED));
151 setEnabled(enabled);
152 logger.debug("enabled = {}", enabled);
153
154 cronExpression = (String) properties.get(AbstractScanner.PARAM_KEY_CRON_EXPR);
155 if (StringUtils.isBlank(cronExpression)) {
156 throw new ConfigurationException(AbstractScanner.PARAM_KEY_CRON_EXPR, "Cron expression must be valid");
157 }
158 setCronExpression(cronExpression);
159 logger.debug("cronExpression = {}", cronExpression);
160
161 try {
162 bufferForSuccessfulJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_SUCCEEDED));
163 } catch (NumberFormatException e) {
164 throw new ConfigurationException(PARAM_KEY_BUFFER_SUCCEEDED, "Buffer must be a valid integer", e);
165 }
166 logger.debug("bufferForSuccessfulJobs = {}", bufferForSuccessfulJobs);
167
168 try {
169 bufferForFailedJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_FAILED));
170 } catch (NumberFormatException e) {
171 throw new ConfigurationException(PARAM_KEY_BUFFER_FAILED, "Buffer must be a valid integer", e);
172 }
173 logger.debug("bufferForFailedJobs = {}", bufferForFailedJobs);
174
175 try {
176 bufferForStoppedJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_STOPPED));
177 } catch (NumberFormatException e) {
178 throw new ConfigurationException(PARAM_KEY_BUFFER_STOPPED, "Buffer must be a valid integer", e);
179 }
180 logger.debug("bufferForStoppedJobs = {}", bufferForStoppedJobs);
181
182 try {
183 bufferForParentlessJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_PARENTLESS));
184 } catch (NumberFormatException e) {
185 throw new ConfigurationException(PARAM_KEY_BUFFER_PARENTLESS, "Buffer must be a valid integer", e);
186 }
187 logger.debug("bufferForParentlessJobs = {}", bufferForParentlessJobs);
188 }
189
190 schedule();
191 }
192
193 @Override
194 public void scan() {
195 if (lock.isLocked()) {
196 logger.info("Skipping workflow scan since a previous scan is still active");
197 return;
198 }
199 try {
200 lock.lock();
201 if (bufferForFailedJobs > 0) {
202 try {
203 getWorkflowService().cleanupWorkflowInstances(bufferForFailedJobs, WorkflowInstance.WorkflowState.FAILED);
204 } catch (WorkflowDatabaseException e) {
205 logger.error("Unable to cleanup failed jobs:", e);
206 } catch (UnauthorizedException e) {
207 logger.error("Workflow cleanup job doesn't have right to delete jobs!");
208 throw new IllegalStateException(e);
209 }
210 }
211
212 if (bufferForSuccessfulJobs > 0) {
213 try {
214 getWorkflowService().cleanupWorkflowInstances(bufferForSuccessfulJobs, WorkflowInstance.WorkflowState.SUCCEEDED);
215 } catch (WorkflowDatabaseException e) {
216 logger.error("Unable to cleanup successful jobs:", e);
217 } catch (UnauthorizedException e) {
218 logger.error("Workflow cleanup job doesn't have right to delete jobs!");
219 throw new IllegalStateException(e);
220 }
221 }
222
223 if (bufferForStoppedJobs > 0) {
224 try {
225 getWorkflowService().cleanupWorkflowInstances(bufferForStoppedJobs, WorkflowInstance.WorkflowState.STOPPED);
226 } catch (WorkflowDatabaseException e) {
227 logger.error("Unable to cleanup stopped jobs:", e);
228 } catch (UnauthorizedException e) {
229 logger.error("Workflow cleanup job doesn't have right to delete jobs!");
230 throw new IllegalStateException(e);
231 }
232 }
233
234 if (bufferForParentlessJobs > 0) {
235 try {
236 getServiceRegistry().removeParentlessJobs(bufferForParentlessJobs);
237 } catch (ServiceRegistryException e) {
238 logger.error("There was an error while removing parentless jobs: {}", e.getMessage());
239 }
240 }
241 } finally {
242 lock.unlock();
243 }
244 }
245
246 @Override
247 public String getScannerName() {
248 return SCANNER_NAME;
249 }
250
251
252 public static class Runner extends TypedQuartzJob<AbstractScanner> {
253 private static final NeedleEye eye = new NeedleEye();
254
255 public Runner() {
256 super(some(eye));
257 }
258
259 @Override
260 protected void execute(final AbstractScanner parameters, JobExecutionContext ctx) {
261 logger.debug("Starting " + parameters.getScannerName() + " job.");
262
263
264 for (final Organization org : parameters.getOrganizationDirectoryService().getOrganizations()) {
265
266 parameters.getAdminContextFor(org.getId()).runInContext(parameters::scan);
267 }
268
269 logger.info("Finished " + parameters.getScannerName() + " job.");
270 }
271 }
272
273 @Reference
274 @Override
275 public void bindWorkflowService(WorkflowService workflowService) {
276 super.bindWorkflowService(workflowService);
277 }
278
279 @Reference
280 @Override
281 public void bindServiceRegistry(ServiceRegistry serviceRegistry) {
282 super.bindServiceRegistry(serviceRegistry);
283 }
284
285 @Reference
286 @Override
287 public void bindOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
288 super.bindOrganizationDirectoryService(organizationDirectoryService);
289 }
290
291 @Reference
292 @Override
293 public void bindSecurityService(SecurityService securityService) {
294 super.bindSecurityService(securityService);
295 }
296
297 }