1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.impl;
23
24 import org.opencastproject.kernel.scanner.AbstractScanner;
25 import org.opencastproject.security.api.Organization;
26 import org.opencastproject.security.api.OrganizationDirectoryService;
27 import org.opencastproject.security.api.SecurityService;
28 import org.opencastproject.security.api.UnauthorizedException;
29 import org.opencastproject.serviceregistry.api.ServiceRegistry;
30 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
31 import org.opencastproject.util.NeedleEye;
32 import org.opencastproject.workflow.api.WorkflowDatabaseException;
33 import org.opencastproject.workflow.api.WorkflowInstance;
34 import org.opencastproject.workflow.api.WorkflowService;
35
36 import org.apache.commons.lang3.BooleanUtils;
37 import org.apache.commons.lang3.StringUtils;
38 import org.osgi.service.cm.ConfigurationException;
39 import org.osgi.service.cm.ManagedService;
40 import org.osgi.service.component.ComponentContext;
41 import org.osgi.service.component.annotations.Activate;
42 import org.osgi.service.component.annotations.Component;
43 import org.osgi.service.component.annotations.Deactivate;
44 import org.osgi.service.component.annotations.Reference;
45 import org.quartz.JobDetail;
46 import org.quartz.JobExecutionContext;
47 import org.quartz.impl.StdSchedulerFactory;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import java.util.Dictionary;
52 import java.util.Optional;
53 import java.util.concurrent.locks.ReentrantLock;
54
55 @Component(
56 immediate = true,
57 service = ManagedService.class,
58 property = {
59 "service.description=Workflow Cleanup Scanner Service"
60 }
61 )
62 public class WorkflowCleanupScanner extends AbstractWorkflowBufferScanner implements ManagedService {
63 private static final String SCANNER_NAME = "Workflow Cleanup Scanner";
64
65
66 private static final Logger logger = LoggerFactory.getLogger(WorkflowCleanupScanner.class);
67
68 private static final String JOB_NAME = "mh-workflow-cleanup-job";
69 private static final String JOB_GROUP = "mh-workflow-cleanup-job-group";
70 private static final String TRIGGER_NAME = "mh-workflow-cleanup-trigger";
71 private static final String TRIGGER_GROUP = "mh-workflow-cleanup-trigger-group";
72
73 private static final String PARAM_KEY_BUFFER_SUCCEEDED = "buffer.succeeded";
74 private static final String PARAM_KEY_BUFFER_FAILED = "buffer.failed";
75 private static final String PARAM_KEY_BUFFER_STOPPED = "buffer.stopped";
76 private static final String PARAM_KEY_BUFFER_PARENTLESS = "buffer.parentless";
77
78
79 protected static int bufferForSuccessfulJobs = -1;
80
81
82 protected static int bufferForFailedJobs = -1;
83
84
85 protected static int bufferForStoppedJobs = -1;
86
87
88 protected static int bufferForParentlessJobs = -1;
89
90
91 private static final ReentrantLock lock = new ReentrantLock();
92
93 public WorkflowCleanupScanner() {
94 try {
95 quartz = new StdSchedulerFactory().getScheduler();
96 quartz.start();
97
98 final JobDetail job = new JobDetail(getJobName(), getJobGroup(), Runner.class);
99 job.setDurability(false);
100 job.setVolatility(true);
101 job.getJobDataMap().put(JOB_PARAM_PARENT, this);
102 quartz.addJob(job, true);
103 } catch (org.quartz.SchedulerException e) {
104 throw new RuntimeException(e);
105 }
106 }
107
108 @Activate
109 @Override
110 public void activate(ComponentContext cc) {
111 super.activate(cc);
112 }
113
114 @Deactivate
115 @Override
116 public void deactivate() {
117 super.deactivate();
118 }
119
120 @Override
121 public String getJobGroup() {
122 return JOB_GROUP;
123 }
124
125 @Override
126 public String getJobName() {
127 return JOB_NAME;
128 }
129
130 @Override
131 public String getTriggerGroupName() {
132 return TRIGGER_GROUP;
133 }
134
135 @Override
136 public String getTriggerName() {
137 return TRIGGER_NAME;
138 }
139
140 @Override
141 public void updated(@SuppressWarnings("rawtypes") Dictionary properties) throws ConfigurationException {
142 boolean enabled = false;
143 String cronExpression;
144 unschedule();
145
146 if (properties != null) {
147 logger.debug("Updating configuration...");
148
149 enabled = BooleanUtils.toBoolean((String) properties.get(AbstractScanner.PARAM_KEY_ENABLED));
150 setEnabled(enabled);
151 logger.debug("enabled = {}", enabled);
152
153 cronExpression = (String) properties.get(AbstractScanner.PARAM_KEY_CRON_EXPR);
154 if (StringUtils.isBlank(cronExpression)) {
155 throw new ConfigurationException(AbstractScanner.PARAM_KEY_CRON_EXPR, "Cron expression must be valid");
156 }
157 setCronExpression(cronExpression);
158 logger.debug("cronExpression = {}", cronExpression);
159
160 try {
161 bufferForSuccessfulJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_SUCCEEDED));
162 } catch (NumberFormatException e) {
163 throw new ConfigurationException(PARAM_KEY_BUFFER_SUCCEEDED, "Buffer must be a valid integer", e);
164 }
165 logger.debug("bufferForSuccessfulJobs = {}", bufferForSuccessfulJobs);
166
167 try {
168 bufferForFailedJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_FAILED));
169 } catch (NumberFormatException e) {
170 throw new ConfigurationException(PARAM_KEY_BUFFER_FAILED, "Buffer must be a valid integer", e);
171 }
172 logger.debug("bufferForFailedJobs = {}", bufferForFailedJobs);
173
174 try {
175 bufferForStoppedJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_STOPPED));
176 } catch (NumberFormatException e) {
177 throw new ConfigurationException(PARAM_KEY_BUFFER_STOPPED, "Buffer must be a valid integer", e);
178 }
179 logger.debug("bufferForStoppedJobs = {}", bufferForStoppedJobs);
180
181 try {
182 bufferForParentlessJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_PARENTLESS));
183 } catch (NumberFormatException e) {
184 throw new ConfigurationException(PARAM_KEY_BUFFER_PARENTLESS, "Buffer must be a valid integer", e);
185 }
186 logger.debug("bufferForParentlessJobs = {}", bufferForParentlessJobs);
187 }
188
189 schedule();
190 }
191
192 @Override
193 public void scan() {
194 if (lock.isLocked()) {
195 logger.info("Skipping workflow scan since a previous scan is still active");
196 return;
197 }
198 try {
199 lock.lock();
200 if (bufferForFailedJobs > 0) {
201 try {
202 getWorkflowService().cleanupWorkflowInstances(bufferForFailedJobs, WorkflowInstance.WorkflowState.FAILED);
203 } catch (WorkflowDatabaseException e) {
204 logger.error("Unable to cleanup failed jobs:", e);
205 } catch (UnauthorizedException e) {
206 logger.error("Workflow cleanup job doesn't have right to delete jobs!");
207 throw new IllegalStateException(e);
208 }
209 }
210
211 if (bufferForSuccessfulJobs > 0) {
212 try {
213 getWorkflowService().cleanupWorkflowInstances(bufferForSuccessfulJobs, WorkflowInstance.WorkflowState.SUCCEEDED);
214 } catch (WorkflowDatabaseException e) {
215 logger.error("Unable to cleanup successful jobs:", e);
216 } catch (UnauthorizedException e) {
217 logger.error("Workflow cleanup job doesn't have right to delete jobs!");
218 throw new IllegalStateException(e);
219 }
220 }
221
222 if (bufferForStoppedJobs > 0) {
223 try {
224 getWorkflowService().cleanupWorkflowInstances(bufferForStoppedJobs, WorkflowInstance.WorkflowState.STOPPED);
225 } catch (WorkflowDatabaseException e) {
226 logger.error("Unable to cleanup stopped jobs:", e);
227 } catch (UnauthorizedException e) {
228 logger.error("Workflow cleanup job doesn't have right to delete jobs!");
229 throw new IllegalStateException(e);
230 }
231 }
232
233 if (bufferForParentlessJobs > 0) {
234 try {
235 getServiceRegistry().removeParentlessJobs(bufferForParentlessJobs);
236 } catch (ServiceRegistryException e) {
237 logger.error("There was an error while removing parentless jobs: {}", e.getMessage());
238 }
239 }
240 } finally {
241 lock.unlock();
242 }
243 }
244
245 @Override
246 public String getScannerName() {
247 return SCANNER_NAME;
248 }
249
250
251 public static class Runner extends TypedQuartzJob<AbstractScanner> {
252 private static final NeedleEye eye = new NeedleEye();
253
254 public Runner() {
255 super(Optional.of(eye));
256 }
257
258 @Override
259 protected void execute(final AbstractScanner parameters, JobExecutionContext ctx) {
260 logger.debug("Starting " + parameters.getScannerName() + " job.");
261
262
263 for (final Organization org : parameters.getOrganizationDirectoryService().getOrganizations()) {
264
265 parameters.getAdminContextFor(org.getId()).runInContext(parameters::scan);
266 }
267
268 logger.info("Finished " + parameters.getScannerName() + " job.");
269 }
270 }
271
272 @Reference
273 @Override
274 public void bindWorkflowService(WorkflowService workflowService) {
275 super.bindWorkflowService(workflowService);
276 }
277
278 @Reference
279 @Override
280 public void bindServiceRegistry(ServiceRegistry serviceRegistry) {
281 super.bindServiceRegistry(serviceRegistry);
282 }
283
284 @Reference
285 @Override
286 public void bindOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
287 super.bindOrganizationDirectoryService(organizationDirectoryService);
288 }
289
290 @Reference
291 @Override
292 public void bindSecurityService(SecurityService securityService) {
293 super.bindSecurityService(securityService);
294 }
295
296 }