View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.impl;
23  
24  import static org.opencastproject.util.data.Option.some;
25  
26  import org.opencastproject.kernel.scanner.AbstractScanner;
27  import org.opencastproject.security.api.Organization;
28  import org.opencastproject.security.api.OrganizationDirectoryService;
29  import org.opencastproject.security.api.SecurityService;
30  import org.opencastproject.security.api.UnauthorizedException;
31  import org.opencastproject.serviceregistry.api.ServiceRegistry;
32  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
33  import org.opencastproject.util.NeedleEye;
34  import org.opencastproject.workflow.api.WorkflowDatabaseException;
35  import org.opencastproject.workflow.api.WorkflowInstance;
36  import org.opencastproject.workflow.api.WorkflowService;
37  
38  import org.apache.commons.lang3.BooleanUtils;
39  import org.apache.commons.lang3.StringUtils;
40  import org.osgi.service.cm.ConfigurationException;
41  import org.osgi.service.cm.ManagedService;
42  import org.osgi.service.component.ComponentContext;
43  import org.osgi.service.component.annotations.Activate;
44  import org.osgi.service.component.annotations.Component;
45  import org.osgi.service.component.annotations.Deactivate;
46  import org.osgi.service.component.annotations.Reference;
47  import org.quartz.JobDetail;
48  import org.quartz.JobExecutionContext;
49  import org.quartz.impl.StdSchedulerFactory;
50  import org.slf4j.Logger;
51  import org.slf4j.LoggerFactory;
52  
53  import java.util.Dictionary;
54  import java.util.concurrent.locks.ReentrantLock;
55  
56  @Component(
57      immediate = true,
58      service = ManagedService.class,
59      property = {
60          "service.description=Workflow Cleanup Scanner Service"
61      }
62  )
63  public class WorkflowCleanupScanner extends AbstractWorkflowBufferScanner implements ManagedService {
64    private static final String SCANNER_NAME = "Workflow Cleanup Scanner";
65  
66    /** The logging facility */
67    private static final Logger logger = LoggerFactory.getLogger(WorkflowCleanupScanner.class);
68  
69    private static final String JOB_NAME = "mh-workflow-cleanup-job";
70    private static final String JOB_GROUP = "mh-workflow-cleanup-job-group";
71    private static final String TRIGGER_NAME = "mh-workflow-cleanup-trigger";
72    private static final String TRIGGER_GROUP = "mh-workflow-cleanup-trigger-group";
73  
74    private static final String PARAM_KEY_BUFFER_SUCCEEDED = "buffer.succeeded";
75    private static final String PARAM_KEY_BUFFER_FAILED = "buffer.failed";
76    private static final String PARAM_KEY_BUFFER_STOPPED = "buffer.stopped";
77    private static final String PARAM_KEY_BUFFER_PARENTLESS = "buffer.parentless";
78  
79    /** Buffer of successful jobs in days */
80    protected static int bufferForSuccessfulJobs = -1;
81  
82    /** Buffer of failed jobs in days */
83    protected static int bufferForFailedJobs = -1;
84  
85    /** Buffer of failed jobs in days */
86    protected static int bufferForStoppedJobs = -1;
87  
88    /** Buffer of parentless jobs in days */
89    protected static int bufferForParentlessJobs = -1;
90  
91    // Lock to prevent concurrent cleanups
92    private static final ReentrantLock lock = new ReentrantLock();
93  
94    public WorkflowCleanupScanner() {
95      try {
96        quartz = new StdSchedulerFactory().getScheduler();
97        quartz.start();
98        // create and set the job. To actually run it call schedule(..)
99        final JobDetail job = new JobDetail(getJobName(), getJobGroup(), Runner.class);
100       job.setDurability(false);
101       job.setVolatility(true);
102       job.getJobDataMap().put(JOB_PARAM_PARENT, this);
103       quartz.addJob(job, true);
104     } catch (org.quartz.SchedulerException e) {
105       throw new RuntimeException(e);
106     }
107   }
108 
109   @Activate
110   @Override
111   public void activate(ComponentContext cc) {
112     super.activate(cc);
113   }
114 
115   @Deactivate
116   @Override
117   public void deactivate() {
118     super.deactivate();
119   }
120 
121   @Override
122   public String getJobGroup() {
123     return JOB_GROUP;
124   }
125 
126   @Override
127   public String getJobName() {
128     return JOB_NAME;
129   }
130 
131   @Override
132   public String getTriggerGroupName() {
133     return TRIGGER_GROUP;
134   }
135 
136   @Override
137   public String getTriggerName() {
138     return TRIGGER_NAME;
139   }
140 
141   @Override
142   public void updated(@SuppressWarnings("rawtypes") Dictionary properties) throws ConfigurationException {
143     boolean enabled = false;
144     String cronExpression;
145     unschedule();
146 
147     if (properties != null) {
148       logger.debug("Updating configuration...");
149 
150       enabled = BooleanUtils.toBoolean((String) properties.get(AbstractScanner.PARAM_KEY_ENABLED));
151       setEnabled(enabled);
152       logger.debug("enabled = {}", enabled);
153 
154       cronExpression = (String) properties.get(AbstractScanner.PARAM_KEY_CRON_EXPR);
155       if (StringUtils.isBlank(cronExpression)) {
156         throw new ConfigurationException(AbstractScanner.PARAM_KEY_CRON_EXPR, "Cron expression must be valid");
157       }
158       setCronExpression(cronExpression);
159       logger.debug("cronExpression = {}", cronExpression);
160 
161       try {
162         bufferForSuccessfulJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_SUCCEEDED));
163       } catch (NumberFormatException e) {
164         throw new ConfigurationException(PARAM_KEY_BUFFER_SUCCEEDED, "Buffer must be a valid integer", e);
165       }
166       logger.debug("bufferForSuccessfulJobs = {}", bufferForSuccessfulJobs);
167 
168       try {
169         bufferForFailedJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_FAILED));
170       } catch (NumberFormatException e) {
171         throw new ConfigurationException(PARAM_KEY_BUFFER_FAILED, "Buffer must be a valid integer", e);
172       }
173       logger.debug("bufferForFailedJobs = {}", bufferForFailedJobs);
174 
175       try {
176         bufferForStoppedJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_STOPPED));
177       } catch (NumberFormatException e) {
178         throw new ConfigurationException(PARAM_KEY_BUFFER_STOPPED, "Buffer must be a valid integer", e);
179       }
180       logger.debug("bufferForStoppedJobs = {}", bufferForStoppedJobs);
181 
182       try {
183         bufferForParentlessJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_PARENTLESS));
184       } catch (NumberFormatException e) {
185         throw new ConfigurationException(PARAM_KEY_BUFFER_PARENTLESS, "Buffer must be a valid integer", e);
186       }
187       logger.debug("bufferForParentlessJobs = {}", bufferForParentlessJobs);
188     }
189 
190     schedule();
191   }
192 
193   @Override
194   public void scan() {
195     if (lock.isLocked()) {
196       logger.info("Skipping workflow scan since a previous scan is still active");
197       return;
198     }
199     try {
200       lock.lock();
201       if (bufferForFailedJobs > 0) {
202         try {
203           getWorkflowService().cleanupWorkflowInstances(bufferForFailedJobs, WorkflowInstance.WorkflowState.FAILED);
204         } catch (WorkflowDatabaseException e) {
205           logger.error("Unable to cleanup failed jobs:", e);
206         } catch (UnauthorizedException e) {
207           logger.error("Workflow cleanup job doesn't have right to delete jobs!");
208           throw new IllegalStateException(e);
209         }
210       }
211 
212       if (bufferForSuccessfulJobs > 0) {
213         try {
214           getWorkflowService().cleanupWorkflowInstances(bufferForSuccessfulJobs, WorkflowInstance.WorkflowState.SUCCEEDED);
215         } catch (WorkflowDatabaseException e) {
216           logger.error("Unable to cleanup successful jobs:", e);
217         } catch (UnauthorizedException e) {
218           logger.error("Workflow cleanup job doesn't have right to delete jobs!");
219           throw new IllegalStateException(e);
220         }
221       }
222 
223       if (bufferForStoppedJobs > 0) {
224         try {
225           getWorkflowService().cleanupWorkflowInstances(bufferForStoppedJobs, WorkflowInstance.WorkflowState.STOPPED);
226         } catch (WorkflowDatabaseException e) {
227           logger.error("Unable to cleanup stopped jobs:", e);
228         } catch (UnauthorizedException e) {
229           logger.error("Workflow cleanup job doesn't have right to delete jobs!");
230           throw new IllegalStateException(e);
231         }
232       }
233 
234       if (bufferForParentlessJobs > 0) {
235         try {
236           getServiceRegistry().removeParentlessJobs(bufferForParentlessJobs);
237         } catch (ServiceRegistryException e) {
238           logger.error("There was an error while removing parentless jobs: {}", e.getMessage());
239         }
240       }
241     } finally {
242       lock.unlock();
243     }
244   }
245 
246   @Override
247   public String getScannerName() {
248     return SCANNER_NAME;
249   }
250 
251   /** Quartz job to which cleans up the workflow instances */
252   public static class Runner extends TypedQuartzJob<AbstractScanner> {
253     private static final NeedleEye eye = new NeedleEye();
254 
255     public Runner() {
256       super(some(eye));
257     }
258 
259     @Override
260     protected void execute(final AbstractScanner parameters, JobExecutionContext ctx) {
261       logger.debug("Starting " + parameters.getScannerName() + " job.");
262 
263       // iterate all organizations
264       for (final Organization org : parameters.getOrganizationDirectoryService().getOrganizations()) {
265         // set the organization on the current thread
266         parameters.getAdminContextFor(org.getId()).runInContext(parameters::scan);
267       }
268 
269       logger.info("Finished " + parameters.getScannerName() + " job.");
270     }
271   }
272 
273   @Reference
274   @Override
275   public void bindWorkflowService(WorkflowService workflowService) {
276     super.bindWorkflowService(workflowService);
277   }
278 
279   @Reference
280   @Override
281   public void bindServiceRegistry(ServiceRegistry serviceRegistry) {
282     super.bindServiceRegistry(serviceRegistry);
283   }
284 
285   @Reference
286   @Override
287   public void bindOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
288     super.bindOrganizationDirectoryService(organizationDirectoryService);
289   }
290 
291   @Reference
292   @Override
293   public void bindSecurityService(SecurityService securityService) {
294     super.bindSecurityService(securityService);
295   }
296 
297 }