1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.impl;
23
24 import org.opencastproject.kernel.scanner.AbstractScanner;
25 import org.opencastproject.security.api.Organization;
26 import org.opencastproject.security.api.OrganizationDirectoryService;
27 import org.opencastproject.security.api.SecurityService;
28 import org.opencastproject.security.api.UnauthorizedException;
29 import org.opencastproject.serviceregistry.api.ServiceRegistry;
30 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
31 import org.opencastproject.util.NeedleEye;
32 import org.opencastproject.workflow.api.WorkflowDatabaseException;
33 import org.opencastproject.workflow.api.WorkflowInstance;
34 import org.opencastproject.workflow.api.WorkflowService;
35
36 import org.apache.commons.lang3.BooleanUtils;
37 import org.apache.commons.lang3.StringUtils;
38 import org.osgi.service.cm.ConfigurationException;
39 import org.osgi.service.cm.ManagedService;
40 import org.osgi.service.component.ComponentContext;
41 import org.osgi.service.component.annotations.Activate;
42 import org.osgi.service.component.annotations.Component;
43 import org.osgi.service.component.annotations.Deactivate;
44 import org.osgi.service.component.annotations.Reference;
45 import org.quartz.JobDetail;
46 import org.quartz.JobExecutionContext;
47 import org.quartz.impl.StdSchedulerFactory;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import java.util.Dictionary;
52 import java.util.Optional;
53 import java.util.concurrent.locks.ReentrantLock;
54
55 @Component(
56 immediate = true,
57 service = ManagedService.class,
58 property = {
59 "service.description=Workflow Cleanup Scanner Service"
60 }
61 )
62 public class WorkflowCleanupScanner extends AbstractWorkflowBufferScanner implements ManagedService {
63 private static final String SCANNER_NAME = "Workflow Cleanup Scanner";
64
65
66 private static final Logger logger = LoggerFactory.getLogger(WorkflowCleanupScanner.class);
67
68 private static final String JOB_NAME = "mh-workflow-cleanup-job";
69 private static final String JOB_GROUP = "mh-workflow-cleanup-job-group";
70 private static final String TRIGGER_NAME = "mh-workflow-cleanup-trigger";
71 private static final String TRIGGER_GROUP = "mh-workflow-cleanup-trigger-group";
72
73 private static final String PARAM_KEY_BUFFER_SUCCEEDED = "buffer.succeeded";
74 private static final String PARAM_KEY_BUFFER_FAILED = "buffer.failed";
75 private static final String PARAM_KEY_BUFFER_STOPPED = "buffer.stopped";
76 private static final String PARAM_KEY_BUFFER_PARENTLESS = "buffer.parentless";
77
78
79 protected static int bufferForSuccessfulJobs = -1;
80
81
82 protected static int bufferForFailedJobs = -1;
83
84
85 protected static int bufferForStoppedJobs = -1;
86
87
88 protected static int bufferForParentlessJobs = -1;
89
90
91 private static final ReentrantLock lock = new ReentrantLock();
92
93 public WorkflowCleanupScanner() {
94 try {
95 quartz = new StdSchedulerFactory().getScheduler();
96 quartz.start();
97
98 final JobDetail job = new JobDetail(getJobName(), getJobGroup(), Runner.class);
99 job.setDurability(false);
100 job.setVolatility(true);
101 job.getJobDataMap().put(JOB_PARAM_PARENT, this);
102 quartz.addJob(job, true);
103 } catch (org.quartz.SchedulerException e) {
104 throw new RuntimeException(e);
105 }
106 }
107
108 @Activate
109 @Override
110 public void activate(ComponentContext cc) {
111 super.activate(cc);
112 }
113
114 @Deactivate
115 @Override
116 public void deactivate() {
117 super.deactivate();
118 }
119
120 @Override
121 public String getJobGroup() {
122 return JOB_GROUP;
123 }
124
125 @Override
126 public String getJobName() {
127 return JOB_NAME;
128 }
129
130 @Override
131 public String getTriggerGroupName() {
132 return TRIGGER_GROUP;
133 }
134
135 @Override
136 public String getTriggerName() {
137 return TRIGGER_NAME;
138 }
139
140 @Override
141 public void updated(@SuppressWarnings("rawtypes") Dictionary properties) throws ConfigurationException {
142 boolean enabled = false;
143 String cronExpression;
144 unschedule();
145
146 if (properties != null) {
147 logger.debug("Updating configuration...");
148
149 enabled = BooleanUtils.toBoolean((String) properties.get(AbstractScanner.PARAM_KEY_ENABLED));
150 setEnabled(enabled);
151 logger.debug("enabled = {}", enabled);
152
153 cronExpression = (String) properties.get(AbstractScanner.PARAM_KEY_CRON_EXPR);
154 if (StringUtils.isBlank(cronExpression)) {
155 throw new ConfigurationException(AbstractScanner.PARAM_KEY_CRON_EXPR, "Cron expression must be valid");
156 }
157 setCronExpression(cronExpression);
158 logger.debug("cronExpression = {}", cronExpression);
159
160 try {
161 bufferForSuccessfulJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_SUCCEEDED));
162 } catch (NumberFormatException e) {
163 throw new ConfigurationException(PARAM_KEY_BUFFER_SUCCEEDED, "Buffer must be a valid integer", e);
164 }
165 logger.debug("bufferForSuccessfulJobs = {}", bufferForSuccessfulJobs);
166
167 try {
168 bufferForFailedJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_FAILED));
169 } catch (NumberFormatException e) {
170 throw new ConfigurationException(PARAM_KEY_BUFFER_FAILED, "Buffer must be a valid integer", e);
171 }
172 logger.debug("bufferForFailedJobs = {}", bufferForFailedJobs);
173
174 try {
175 bufferForStoppedJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_STOPPED));
176 } catch (NumberFormatException e) {
177 throw new ConfigurationException(PARAM_KEY_BUFFER_STOPPED, "Buffer must be a valid integer", e);
178 }
179 logger.debug("bufferForStoppedJobs = {}", bufferForStoppedJobs);
180
181 try {
182 bufferForParentlessJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_PARENTLESS));
183 } catch (NumberFormatException e) {
184 throw new ConfigurationException(PARAM_KEY_BUFFER_PARENTLESS, "Buffer must be a valid integer", e);
185 }
186 logger.debug("bufferForParentlessJobs = {}", bufferForParentlessJobs);
187 }
188
189 schedule();
190 }
191
192 @Override
193 public void scan() {
194 if (lock.isLocked()) {
195 logger.info("Skipping workflow scan since a previous scan is still active");
196 return;
197 }
198 try {
199 lock.lock();
200 if (bufferForFailedJobs > 0) {
201 try {
202 getWorkflowService().cleanupWorkflowInstances(bufferForFailedJobs,
203 WorkflowInstance.WorkflowState.FAILED);
204 } catch (WorkflowDatabaseException e) {
205 logger.error("Unable to cleanup failed jobs:", e);
206 } catch (UnauthorizedException e) {
207 logger.error("Workflow cleanup job doesn't have right to delete jobs!");
208 throw new IllegalStateException(e);
209 }
210 }
211
212 if (bufferForSuccessfulJobs > 0) {
213 try {
214 getWorkflowService().cleanupWorkflowInstances(bufferForSuccessfulJobs,
215 WorkflowInstance.WorkflowState.SUCCEEDED);
216 } catch (WorkflowDatabaseException e) {
217 logger.error("Unable to cleanup successful jobs:", e);
218 } catch (UnauthorizedException e) {
219 logger.error("Workflow cleanup job doesn't have right to delete jobs!");
220 throw new IllegalStateException(e);
221 }
222 }
223
224 if (bufferForStoppedJobs > 0) {
225 try {
226 getWorkflowService().cleanupWorkflowInstances(bufferForStoppedJobs,
227 WorkflowInstance.WorkflowState.STOPPED);
228 } catch (WorkflowDatabaseException e) {
229 logger.error("Unable to cleanup stopped jobs:", e);
230 } catch (UnauthorizedException e) {
231 logger.error("Workflow cleanup job doesn't have right to delete jobs!");
232 throw new IllegalStateException(e);
233 }
234 }
235
236 if (bufferForParentlessJobs > 0) {
237 try {
238 getServiceRegistry().removeParentlessJobs(bufferForParentlessJobs);
239 } catch (ServiceRegistryException e) {
240 logger.error("There was an error while removing parentless jobs: {}", e.getMessage());
241 }
242 }
243 } finally {
244 lock.unlock();
245 }
246 }
247
248 @Override
249 public String getScannerName() {
250 return SCANNER_NAME;
251 }
252
253
254 public static class Runner extends TypedQuartzJob<AbstractScanner> {
255 private static final NeedleEye eye = new NeedleEye();
256
257 public Runner() {
258 super(Optional.of(eye));
259 }
260
261 @Override
262 protected void execute(final AbstractScanner parameters, JobExecutionContext ctx) {
263 logger.debug("Starting " + parameters.getScannerName() + " job.");
264
265
266 for (final Organization org : parameters.getOrganizationDirectoryService().getOrganizations()) {
267
268 parameters.getAdminContextFor(org.getId()).runInContext(parameters::scan);
269 }
270
271 logger.info("Finished " + parameters.getScannerName() + " job.");
272 }
273 }
274
275 @Reference
276 @Override
277 public void bindWorkflowService(WorkflowService workflowService) {
278 super.bindWorkflowService(workflowService);
279 }
280
281 @Reference
282 @Override
283 public void bindServiceRegistry(ServiceRegistry serviceRegistry) {
284 super.bindServiceRegistry(serviceRegistry);
285 }
286
287 @Reference
288 @Override
289 public void bindOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
290 super.bindOrganizationDirectoryService(organizationDirectoryService);
291 }
292
293 @Reference
294 @Override
295 public void bindSecurityService(SecurityService securityService) {
296 super.bindSecurityService(securityService);
297 }
298
299 }