View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.job.api;
23  
24  import org.opencastproject.job.api.Job.Status;
25  import org.opencastproject.serviceregistry.api.ServiceRegistry;
26  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
27  import org.opencastproject.util.JobCanceledException;
28  import org.opencastproject.util.NotFoundException;
29  
30  import com.entwinemedia.fn.data.Opt;
31  
32  import org.slf4j.Logger;
33  import org.slf4j.LoggerFactory;
34  
35  import java.util.ArrayList;
36  import java.util.Arrays;
37  import java.util.HashMap;
38  import java.util.List;
39  import java.util.Map;
40  
41  /**
42   * This class is a utility implementation that will wait for all given jobs to change their status to either one of:
43   * <ul>
44   * <li>{@link Job.Status#FINISHED}</li>
45   * <li>{@link Job.Status#FAILED}</li>
46   * <li>{@link Job.Status#DELETED}</li>
47   * </ul>
48   */
49  public final class JobBarrier {
50    /** The logging facility */
51    private static final Logger logger = LoggerFactory.getLogger(JobBarrier.class);
52  
53    /** Default polling interval is 5 seconds */
54    public static final long DEFAULT_POLLING_INTERVAL = 5000L;
55  
56    /** The service registry used to do the polling */
57    private final ServiceRegistry serviceRegistry;
58  
59    /** Time in milliseconds between two pools for the job status */
60    private final long pollingInterval;
61  
62    /** The job that's waiting */
63    private final Opt<Long> waiterJobId;
64  
65    /** The jobs to wait on */
66    private final List<Job> jobs;
67  
68    /** An exception that might have been thrown while polling */
69    private volatile Throwable pollingException = null;
70  
71    /** The status map */
72    private volatile Result status = null;
73  
74    /**
75     * Creates a barrier without any jobs, using <code>registry</code> to poll for the outcome of the monitored jobs using
76     * the default polling interval {@link #DEFAULT_POLLING_INTERVAL}. The <code>waiter</code> is the job which is waiting
77     * for the other jobs to finish. Use {@link #addJob(Job)} to add jobs to monitor.
78     *
79     * @param registry
80     *          the registry
81     * @param waiter
82     *          the job waiting for the other jobs to finish
83     */
84    public JobBarrier(Job waiter, ServiceRegistry registry) {
85      this(waiter, registry, DEFAULT_POLLING_INTERVAL, new Job[] {});
86    }
87  
88    /**
89     * Creates a barrier for <code>jobs</code>, using <code>registry</code> to poll for the outcome of the monitored jobs
90     * using the default polling interval {@link #DEFAULT_POLLING_INTERVAL}. The <code>waiter</code> is the job which is
91     * waiting for the other jobs to finish.
92     *
93     * @param registry
94     *          the registry
95     * @param jobs
96     *          the jobs to monitor
97     * @param waiter
98     *          the job waiting for the other jobs to finish
99     */
100   public JobBarrier(Job waiter, ServiceRegistry registry, Job... jobs) {
101     this(waiter, registry, DEFAULT_POLLING_INTERVAL, jobs);
102   }
103 
104   /**
105    * Creates a wrapper for <code>job</code>, using <code>registry</code> to poll for the job outcome. The
106    * <code>waiter</code> is the job which is waiting for the other jobs to finish.
107    *
108    * @param registry
109    *          the registry
110    * @param pollingInterval
111    *          the time in miliseconds between two polling operations
112    * @param waiter
113    *          the job waiting for the other jobs to finish
114    */
115   public JobBarrier(Job waiter, ServiceRegistry registry, long pollingInterval) {
116     this(waiter, registry, pollingInterval, new Job[] {});
117   }
118 
119   /**
120    * Creates a wrapper for <code>job</code>, using <code>registry</code> to poll for the job outcome. The
121    * <code>waiter</code> is the job which is waiting for the other jobs to finish.
122    *
123    * @param jobs
124    *          the job to poll
125    * @param registry
126    *          the registry
127    * @param pollingInterval
128    *          the time in miliseconds between two polling operations
129    * @param waiter
130    *          the job waiting for the other jobs to finish
131    */
132   public JobBarrier(Job waiter, ServiceRegistry registry, long pollingInterval, Job... jobs) {
133     if (registry == null)
134       throw new IllegalArgumentException("Service registry must not be null");
135     if (jobs == null)
136       throw new IllegalArgumentException("Jobs must not be null");
137     if (pollingInterval < 0)
138       throw new IllegalArgumentException("Polling interval must be a positive number");
139     this.serviceRegistry = registry;
140     this.pollingInterval = pollingInterval;
141     if (waiter != null)
142       this.waiterJobId = Opt.some(waiter.getId());
143     else
144       this.waiterJobId = Opt.none();
145     this.jobs = new ArrayList<Job>(Arrays.asList(jobs));
146   }
147 
148   private void suspendWaiterJob() {
149     if (this.waiterJobId.isSome()) {
150       try {
151         final Job waiter = serviceRegistry.getJob(waiterJobId.get());
152         waiter.setStatus(Job.Status.WAITING);
153         logger.debug("Job {} set to WAITING state.", waiter.getId());
154         this.serviceRegistry.updateJob(waiter);
155       } catch (ServiceRegistryException e) {
156         logger.warn("Unable to put {} into a waiting state, this may cause a deadlock: {}", waiterJobId, e.getMessage());
157       } catch (NotFoundException e) {
158         logger.warn("Unable to put {} into a waiting state, job not found by the service registry.  This may cause a deadlock: {}", waiterJobId, e.getMessage());
159       }
160     } else {
161       logger.debug("No waiting job set, unable to put waiting job into waiting state");
162     }
163   }
164 
165   private void wakeWaiterJob() {
166     if (this.waiterJobId.isSome()) {
167       try {
168         final Job waiter = serviceRegistry.getJob(waiterJobId.get());
169         waiter.setStatus(Job.Status.RUNNING);
170         logger.debug("Job {} wakened and set back to RUNNING state.", waiter.getId());
171         this.serviceRegistry.updateJob(waiter);
172       } catch (ServiceRegistryException e) {
173         logger.warn("Unable to put {} into a waiting state, this may cause a deadlock: {}", waiterJobId, e.getMessage());
174       } catch (NotFoundException e) {
175         logger.warn("Unable to put {} into a waiting state, job not found by the service registry.  This may cause a deadlock: {}", waiterJobId, e.getMessage());
176       }
177     } else {
178       logger.debug("No waiting job set, unable to put waiting job into waiting state");
179     }
180   }
181 
182   /**
183    * Waits for a status change and returns the new status.
184    *
185    * @return the status
186    */
187   public Result waitForJobs() {
188     return waitForJobs(0);
189   }
190 
191   /**
192    * Waits for a status change on all jobs and returns. If waiting for the status exceeds a certain limit, the method
193    * returns even if some or all of the jobs are not yet finished. The same is true if at least one of the jobs fails or
194    * gets stopped or deleted.
195    *
196    * @param timeout
197    *          the maximum amount of time to wait
198    * @throws IllegalStateException
199    *           if there are no jobs to wait for
200    * @throws JobCanceledException
201    *           if one of the jobs was canceled
202    */
203   public Result waitForJobs(long timeout) throws JobCanceledException, IllegalStateException {
204     if (jobs.size() == 0)
205       return new Result(new HashMap<Job, Status>());
206     this.suspendWaiterJob();
207     synchronized (this) {
208       JobStatusUpdater updater = new JobStatusUpdater(timeout);
209       try {
210         updater.start();
211         wait();
212       } catch (InterruptedException e) {
213         logger.debug("Interrupted while waiting for job");
214       }
215     }
216     if (pollingException != null) {
217       if (pollingException instanceof JobCanceledException)
218         throw (JobCanceledException) pollingException;
219       throw new IllegalStateException(pollingException);
220     }
221     this.wakeWaiterJob();
222     return getStatus();
223   }
224 
225   /**
226    * Adds the job to the list of jobs to wait for. An {@link IllegalStateException} is thrown if the barrier has already
227    * been asked to wait for jobs by calling {@link #waitForJobs()}.
228    *
229    * @param job
230    *          the job
231    * @throws IllegalStateException
232    *           if the barrier already started waiting
233    */
234   public void addJob(Job job) throws IllegalStateException {
235     if (job == null)
236       throw new IllegalArgumentException("Job must not be null");
237     jobs.add(job);
238   }
239 
240   /**
241    * Sets the outcome of the various jobs that were monitored.
242    *
243    * @param status
244    *          the status
245    */
246   void setStatus(Result status) {
247     this.status = status;
248   }
249 
250   /**
251    * Returns the resulting status map.
252    *
253    * @return the status of the individual jobs
254    */
255   public Result getStatus() {
256     return status;
257   }
258 
259   /** Thread that keeps polling for status changes. */
260   class JobStatusUpdater extends Thread {
261     /** Maximum wait in milliseconds or 0 for unlimited waiting */
262     private final long workTime;
263 
264     /**
265      * Creates a new status updater that will wait for finished jobs. If <code>0</code> is passed in as the work time,
266      * the updater will wait as long as it takes. Otherwise, it will stop after the indicated amount of time has passed.
267      *
268      * @param workTime
269      *          the work time
270      */
271     JobStatusUpdater(long workTime) {
272       this.workTime = workTime;
273     }
274 
275     @Override
276     public void run() {
277       final long endTime = workTime > 0 ? System.currentTimeMillis() + workTime : 0;
278       final Map<Job, Job.Status> finishedJobs = new HashMap<Job, Job.Status>();
279       while (true) {
280         final long time = System.currentTimeMillis();
281         // Wait a little..
282         try {
283           final long timeToSleep = Math.min(pollingInterval, Math.abs(endTime - time));
284           Thread.sleep(timeToSleep);
285         } catch (InterruptedException e) {
286           logger.debug("Job polling thread was interrupted");
287           return;
288         }
289         // Look at all jobs and make sure all of them have reached the expected status
290         for (final Job job : jobs) {
291           // Don't ask if we already know
292           if (!finishedJobs.containsKey(job)) {
293             // Get the job status from the service registry
294             try {
295               final Job processedJob = serviceRegistry.getJob(job.getId());
296               final Job.Status jobStatus = processedJob.getStatus();
297               switch (jobStatus) {
298                 case CANCELLED:
299                   throw new JobCanceledException(processedJob);
300                 case DELETED:
301                 case FAILED:
302                 case FINISHED:
303                   job.setStatus(jobStatus);
304                   job.setPayload(processedJob.getPayload());
305                   finishedJobs.put(job, jobStatus);
306                   break;
307                 case PAUSED:
308                 case QUEUED:
309                 case RESTART:
310                 case DISPATCHING:
311                 case INSTANTIATED:
312                 case RUNNING:
313                   logger.trace("{} is still in the works", job);
314                   break;
315                 case WAITING:
316                   logger.trace("{} is waiting", job);
317                   break;
318                 default:
319                   logger.error("Unhandled job status '{}' found", jobStatus);
320                   break;
321               }
322             } catch (NotFoundException e) {
323               logger.warn("Error polling job {}: Not found!", job);
324               finishedJobs.put(job, Job.Status.DELETED);
325               pollingException = e;
326               break;
327             } catch (ServiceRegistryException e) {
328               logger.warn("Error polling service registry for the status of {}: {}", job, e.getMessage());
329             } catch (JobCanceledException e) {
330               logger.warn("Job {} got canceled", job);
331               pollingException = e;
332               updateAndNotify(finishedJobs);
333               return;
334             } catch (Throwable t) {
335               logger.error("An unexpected error occured while waiting for jobs", t);
336               pollingException = t;
337               updateAndNotify(finishedJobs);
338               return;
339             }
340           }
341 
342           // Are we done already?
343           if (finishedJobs.size() == jobs.size()) {
344             updateAndNotify(finishedJobs);
345             return;
346           } else if (workTime > 0 && endTime >= time) {
347             pollingException = new InterruptedException("Timeout waiting for job processing");
348             updateAndNotify(finishedJobs);
349             return;
350           }
351         }
352       }
353     }
354 
355     /**
356      * Notifies listeners about the status change.
357      *
358      * @param status
359      *          the status
360      */
361     private void updateAndNotify(Map<Job, Job.Status> status) {
362       JobBarrier.this.setStatus(new Result(status));
363       synchronized (JobBarrier.this) {
364         JobBarrier.this.notifyAll();
365       }
366     }
367 
368   }
369 
370   /** Result of a waiting operation on a certain number of jobs. */
371   public static class Result {
372     /** The outcome of this barrier */
373     private final Map<Job, Job.Status> status;
374 
375     /**
376      * Creates a new job barrier result.
377      *
378      * @param status
379      *          the barrier outcome
380      */
381     public Result(Map<Job, Job.Status> status) {
382       this.status = status;
383     }
384 
385     /**
386      * Returns the status details.
387      *
388      * @return the status details
389      */
390     public Map<Job, Job.Status> getStatus() {
391       return status;
392     }
393 
394     /**
395      * Returns <code>true</code> if all jobs are in the <code>{@link Job.Status#FINISHED}</code> state.
396      *
397      * @return <code>true</code> if all jobs are finished
398      */
399     public boolean isSuccess() {
400       for (final Job.Status state : status.values()) {
401         if (!state.equals(Job.Status.FINISHED))
402           return false;
403       }
404       return true;
405     }
406   }
407 }