View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.impl;
23  
24  import org.opencastproject.kernel.scanner.AbstractScanner;
25  import org.opencastproject.security.api.Organization;
26  import org.opencastproject.security.api.OrganizationDirectoryService;
27  import org.opencastproject.security.api.SecurityService;
28  import org.opencastproject.security.api.UnauthorizedException;
29  import org.opencastproject.serviceregistry.api.ServiceRegistry;
30  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
31  import org.opencastproject.util.NeedleEye;
32  import org.opencastproject.workflow.api.WorkflowDatabaseException;
33  import org.opencastproject.workflow.api.WorkflowInstance;
34  import org.opencastproject.workflow.api.WorkflowService;
35  
36  import org.apache.commons.lang3.BooleanUtils;
37  import org.apache.commons.lang3.StringUtils;
38  import org.osgi.service.cm.ConfigurationException;
39  import org.osgi.service.cm.ManagedService;
40  import org.osgi.service.component.ComponentContext;
41  import org.osgi.service.component.annotations.Activate;
42  import org.osgi.service.component.annotations.Component;
43  import org.osgi.service.component.annotations.Deactivate;
44  import org.osgi.service.component.annotations.Reference;
45  import org.quartz.JobDetail;
46  import org.quartz.JobExecutionContext;
47  import org.quartz.impl.StdSchedulerFactory;
48  import org.slf4j.Logger;
49  import org.slf4j.LoggerFactory;
50  
51  import java.util.Dictionary;
52  import java.util.Optional;
53  import java.util.concurrent.locks.ReentrantLock;
54  
55  @Component(
56      immediate = true,
57      service = ManagedService.class,
58      property = {
59          "service.description=Workflow Cleanup Scanner Service"
60      }
61  )
62  public class WorkflowCleanupScanner extends AbstractWorkflowBufferScanner implements ManagedService {
63    private static final String SCANNER_NAME = "Workflow Cleanup Scanner";
64  
65    /** The logging facility */
66    private static final Logger logger = LoggerFactory.getLogger(WorkflowCleanupScanner.class);
67  
68    private static final String JOB_NAME = "mh-workflow-cleanup-job";
69    private static final String JOB_GROUP = "mh-workflow-cleanup-job-group";
70    private static final String TRIGGER_NAME = "mh-workflow-cleanup-trigger";
71    private static final String TRIGGER_GROUP = "mh-workflow-cleanup-trigger-group";
72  
73    private static final String PARAM_KEY_BUFFER_SUCCEEDED = "buffer.succeeded";
74    private static final String PARAM_KEY_BUFFER_FAILED = "buffer.failed";
75    private static final String PARAM_KEY_BUFFER_STOPPED = "buffer.stopped";
76    private static final String PARAM_KEY_BUFFER_PARENTLESS = "buffer.parentless";
77  
78    /** Buffer of successful jobs in days */
79    protected static int bufferForSuccessfulJobs = -1;
80  
81    /** Buffer of failed jobs in days */
82    protected static int bufferForFailedJobs = -1;
83  
84    /** Buffer of failed jobs in days */
85    protected static int bufferForStoppedJobs = -1;
86  
87    /** Buffer of parentless jobs in days */
88    protected static int bufferForParentlessJobs = -1;
89  
90    // Lock to prevent concurrent cleanups
91    private static final ReentrantLock lock = new ReentrantLock();
92  
93    public WorkflowCleanupScanner() {
94      try {
95        quartz = new StdSchedulerFactory().getScheduler();
96        quartz.start();
97        // create and set the job. To actually run it call schedule(..)
98        final JobDetail job = new JobDetail(getJobName(), getJobGroup(), Runner.class);
99        job.setDurability(false);
100       job.setVolatility(true);
101       job.getJobDataMap().put(JOB_PARAM_PARENT, this);
102       quartz.addJob(job, true);
103     } catch (org.quartz.SchedulerException e) {
104       throw new RuntimeException(e);
105     }
106   }
107 
108   @Activate
109   @Override
110   public void activate(ComponentContext cc) {
111     super.activate(cc);
112   }
113 
114   @Deactivate
115   @Override
116   public void deactivate() {
117     super.deactivate();
118   }
119 
120   @Override
121   public String getJobGroup() {
122     return JOB_GROUP;
123   }
124 
125   @Override
126   public String getJobName() {
127     return JOB_NAME;
128   }
129 
130   @Override
131   public String getTriggerGroupName() {
132     return TRIGGER_GROUP;
133   }
134 
135   @Override
136   public String getTriggerName() {
137     return TRIGGER_NAME;
138   }
139 
140   @Override
141   public void updated(@SuppressWarnings("rawtypes") Dictionary properties) throws ConfigurationException {
142     boolean enabled = false;
143     String cronExpression;
144     unschedule();
145 
146     if (properties != null) {
147       logger.debug("Updating configuration...");
148 
149       enabled = BooleanUtils.toBoolean((String) properties.get(AbstractScanner.PARAM_KEY_ENABLED));
150       setEnabled(enabled);
151       logger.debug("enabled = {}", enabled);
152 
153       cronExpression = (String) properties.get(AbstractScanner.PARAM_KEY_CRON_EXPR);
154       if (StringUtils.isBlank(cronExpression)) {
155         throw new ConfigurationException(AbstractScanner.PARAM_KEY_CRON_EXPR, "Cron expression must be valid");
156       }
157       setCronExpression(cronExpression);
158       logger.debug("cronExpression = {}", cronExpression);
159 
160       try {
161         bufferForSuccessfulJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_SUCCEEDED));
162       } catch (NumberFormatException e) {
163         throw new ConfigurationException(PARAM_KEY_BUFFER_SUCCEEDED, "Buffer must be a valid integer", e);
164       }
165       logger.debug("bufferForSuccessfulJobs = {}", bufferForSuccessfulJobs);
166 
167       try {
168         bufferForFailedJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_FAILED));
169       } catch (NumberFormatException e) {
170         throw new ConfigurationException(PARAM_KEY_BUFFER_FAILED, "Buffer must be a valid integer", e);
171       }
172       logger.debug("bufferForFailedJobs = {}", bufferForFailedJobs);
173 
174       try {
175         bufferForStoppedJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_STOPPED));
176       } catch (NumberFormatException e) {
177         throw new ConfigurationException(PARAM_KEY_BUFFER_STOPPED, "Buffer must be a valid integer", e);
178       }
179       logger.debug("bufferForStoppedJobs = {}", bufferForStoppedJobs);
180 
181       try {
182         bufferForParentlessJobs = Integer.valueOf((String) properties.get(PARAM_KEY_BUFFER_PARENTLESS));
183       } catch (NumberFormatException e) {
184         throw new ConfigurationException(PARAM_KEY_BUFFER_PARENTLESS, "Buffer must be a valid integer", e);
185       }
186       logger.debug("bufferForParentlessJobs = {}", bufferForParentlessJobs);
187     }
188 
189     schedule();
190   }
191 
192   @Override
193   public void scan() {
194     if (lock.isLocked()) {
195       logger.info("Skipping workflow scan since a previous scan is still active");
196       return;
197     }
198     try {
199       lock.lock();
200       if (bufferForFailedJobs > 0) {
201         try {
202           getWorkflowService().cleanupWorkflowInstances(bufferForFailedJobs,
203               WorkflowInstance.WorkflowState.FAILED);
204         } catch (WorkflowDatabaseException e) {
205           logger.error("Unable to cleanup failed jobs:", e);
206         } catch (UnauthorizedException e) {
207           logger.error("Workflow cleanup job doesn't have right to delete jobs!");
208           throw new IllegalStateException(e);
209         }
210       }
211 
212       if (bufferForSuccessfulJobs > 0) {
213         try {
214           getWorkflowService().cleanupWorkflowInstances(bufferForSuccessfulJobs,
215               WorkflowInstance.WorkflowState.SUCCEEDED);
216         } catch (WorkflowDatabaseException e) {
217           logger.error("Unable to cleanup successful jobs:", e);
218         } catch (UnauthorizedException e) {
219           logger.error("Workflow cleanup job doesn't have right to delete jobs!");
220           throw new IllegalStateException(e);
221         }
222       }
223 
224       if (bufferForStoppedJobs > 0) {
225         try {
226           getWorkflowService().cleanupWorkflowInstances(bufferForStoppedJobs,
227               WorkflowInstance.WorkflowState.STOPPED);
228         } catch (WorkflowDatabaseException e) {
229           logger.error("Unable to cleanup stopped jobs:", e);
230         } catch (UnauthorizedException e) {
231           logger.error("Workflow cleanup job doesn't have right to delete jobs!");
232           throw new IllegalStateException(e);
233         }
234       }
235 
236       if (bufferForParentlessJobs > 0) {
237         try {
238           getServiceRegistry().removeParentlessJobs(bufferForParentlessJobs);
239         } catch (ServiceRegistryException e) {
240           logger.error("There was an error while removing parentless jobs: {}", e.getMessage());
241         }
242       }
243     } finally {
244       lock.unlock();
245     }
246   }
247 
248   @Override
249   public String getScannerName() {
250     return SCANNER_NAME;
251   }
252 
253   /** Quartz job to which cleans up the workflow instances */
254   public static class Runner extends TypedQuartzJob<AbstractScanner> {
255     private static final NeedleEye eye = new NeedleEye();
256 
257     public Runner() {
258       super(Optional.of(eye));
259     }
260 
261     @Override
262     protected void execute(final AbstractScanner parameters, JobExecutionContext ctx) {
263       logger.debug("Starting " + parameters.getScannerName() + " job.");
264 
265       // iterate all organizations
266       for (final Organization org : parameters.getOrganizationDirectoryService().getOrganizations()) {
267         // set the organization on the current thread
268         parameters.getAdminContextFor(org.getId()).runInContext(parameters::scan);
269       }
270 
271       logger.info("Finished " + parameters.getScannerName() + " job.");
272     }
273   }
274 
275   @Reference
276   @Override
277   public void bindWorkflowService(WorkflowService workflowService) {
278     super.bindWorkflowService(workflowService);
279   }
280 
281   @Reference
282   @Override
283   public void bindServiceRegistry(ServiceRegistry serviceRegistry) {
284     super.bindServiceRegistry(serviceRegistry);
285   }
286 
287   @Reference
288   @Override
289   public void bindOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
290     super.bindOrganizationDirectoryService(organizationDirectoryService);
291   }
292 
293   @Reference
294   @Override
295   public void bindSecurityService(SecurityService securityService) {
296     super.bindSecurityService(securityService);
297   }
298 
299 }