View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.job.api;
23  
24  import org.opencastproject.job.api.Job.Status;
25  import org.opencastproject.serviceregistry.api.ServiceRegistry;
26  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
27  import org.opencastproject.util.JobCanceledException;
28  import org.opencastproject.util.NotFoundException;
29  
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.HashMap;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Optional;
39  
40  /**
41   * This class is a utility implementation that will wait for all given jobs to change their status to either one of:
42   * <ul>
43   * <li>{@link Job.Status#FINISHED}</li>
44   * <li>{@link Job.Status#FAILED}</li>
45   * <li>{@link Job.Status#DELETED}</li>
46   * </ul>
47   */
48  public final class JobBarrier {
49    /** The logging facility */
50    private static final Logger logger = LoggerFactory.getLogger(JobBarrier.class);
51  
52    /** Default polling interval is 5 seconds */
53    public static final long DEFAULT_POLLING_INTERVAL = 5000L;
54  
55    /** The service registry used to do the polling */
56    private final ServiceRegistry serviceRegistry;
57  
58    /** Time in milliseconds between two pools for the job status */
59    private final long pollingInterval;
60  
61    /** The job that's waiting */
62    private final Optional<Long> waiterJobId;
63  
64    /** The jobs to wait on */
65    private final List<Job> jobs;
66  
67    /** An exception that might have been thrown while polling */
68    private volatile Throwable pollingException = null;
69  
70    /** The status map */
71    private volatile Result status = null;
72  
73    /**
74     * Creates a barrier without any jobs, using <code>registry</code> to poll for the outcome of the monitored jobs using
75     * the default polling interval {@link #DEFAULT_POLLING_INTERVAL}. The <code>waiter</code> is the job which is waiting
76     * for the other jobs to finish. Use {@link #addJob(Job)} to add jobs to monitor.
77     *
78     * @param registry
79     *          the registry
80     * @param waiter
81     *          the job waiting for the other jobs to finish
82     */
83    public JobBarrier(Job waiter, ServiceRegistry registry) {
84      this(waiter, registry, DEFAULT_POLLING_INTERVAL, new Job[] {});
85    }
86  
87    /**
88     * Creates a barrier for <code>jobs</code>, using <code>registry</code> to poll for the outcome of the monitored jobs
89     * using the default polling interval {@link #DEFAULT_POLLING_INTERVAL}. The <code>waiter</code> is the job which is
90     * waiting for the other jobs to finish.
91     *
92     * @param registry
93     *          the registry
94     * @param jobs
95     *          the jobs to monitor
96     * @param waiter
97     *          the job waiting for the other jobs to finish
98     */
99    public JobBarrier(Job waiter, ServiceRegistry registry, Job... jobs) {
100     this(waiter, registry, DEFAULT_POLLING_INTERVAL, jobs);
101   }
102 
103   /**
104    * Creates a wrapper for <code>job</code>, using <code>registry</code> to poll for the job outcome. The
105    * <code>waiter</code> is the job which is waiting for the other jobs to finish.
106    *
107    * @param registry
108    *          the registry
109    * @param pollingInterval
110    *          the time in miliseconds between two polling operations
111    * @param waiter
112    *          the job waiting for the other jobs to finish
113    */
114   public JobBarrier(Job waiter, ServiceRegistry registry, long pollingInterval) {
115     this(waiter, registry, pollingInterval, new Job[] {});
116   }
117 
118   /**
119    * Creates a wrapper for <code>job</code>, using <code>registry</code> to poll for the job outcome. The
120    * <code>waiter</code> is the job which is waiting for the other jobs to finish.
121    *
122    * @param jobs
123    *          the job to poll
124    * @param registry
125    *          the registry
126    * @param pollingInterval
127    *          the time in miliseconds between two polling operations
128    * @param waiter
129    *          the job waiting for the other jobs to finish
130    */
131   public JobBarrier(Job waiter, ServiceRegistry registry, long pollingInterval, Job... jobs) {
132     if (registry == null)
133       throw new IllegalArgumentException("Service registry must not be null");
134     if (jobs == null)
135       throw new IllegalArgumentException("Jobs must not be null");
136     if (pollingInterval < 0)
137       throw new IllegalArgumentException("Polling interval must be a positive number");
138     this.serviceRegistry = registry;
139     this.pollingInterval = pollingInterval;
140     if (waiter != null)
141       this.waiterJobId = Optional.of(waiter.getId());
142     else
143       this.waiterJobId = Optional.empty();
144     this.jobs = new ArrayList<Job>(Arrays.asList(jobs));
145   }
146 
147   private void suspendWaiterJob() {
148     if (this.waiterJobId.isPresent()) {
149       try {
150         final Job waiter = serviceRegistry.getJob(waiterJobId.get());
151         waiter.setStatus(Job.Status.WAITING);
152         logger.debug("Job {} set to WAITING state.", waiter.getId());
153         this.serviceRegistry.updateJob(waiter);
154       } catch (ServiceRegistryException e) {
155         logger.warn("Unable to put {} into a waiting state, this may cause a deadlock: {}", waiterJobId, e.getMessage());
156       } catch (NotFoundException e) {
157         logger.warn("Unable to put {} into a waiting state, job not found by the service registry.  This may cause a deadlock: {}", waiterJobId, e.getMessage());
158       }
159     } else {
160       logger.debug("No waiting job set, unable to put waiting job into waiting state");
161     }
162   }
163 
164   private void wakeWaiterJob() {
165     if (this.waiterJobId.isPresent()) {
166       try {
167         final Job waiter = serviceRegistry.getJob(waiterJobId.get());
168         waiter.setStatus(Job.Status.RUNNING);
169         logger.debug("Job {} wakened and set back to RUNNING state.", waiter.getId());
170         this.serviceRegistry.updateJob(waiter);
171       } catch (ServiceRegistryException e) {
172         logger.warn("Unable to put {} into a waiting state, this may cause a deadlock: {}", waiterJobId, e.getMessage());
173       } catch (NotFoundException e) {
174         logger.warn("Unable to put {} into a waiting state, job not found by the service registry.  This may cause a deadlock: {}", waiterJobId, e.getMessage());
175       }
176     } else {
177       logger.debug("No waiting job set, unable to put waiting job into waiting state");
178     }
179   }
180 
181   /**
182    * Waits for a status change and returns the new status.
183    *
184    * @return the status
185    */
186   public Result waitForJobs() {
187     return waitForJobs(0);
188   }
189 
190   /**
191    * Waits for a status change on all jobs and returns. If waiting for the status exceeds a certain limit, the method
192    * returns even if some or all of the jobs are not yet finished. The same is true if at least one of the jobs fails or
193    * gets stopped or deleted.
194    *
195    * @param timeout
196    *          the maximum amount of time to wait
197    * @throws IllegalStateException
198    *           if there are no jobs to wait for
199    * @throws JobCanceledException
200    *           if one of the jobs was canceled
201    */
202   public Result waitForJobs(long timeout) throws JobCanceledException, IllegalStateException {
203     if (jobs.size() == 0)
204       return new Result(new HashMap<Job, Status>());
205     this.suspendWaiterJob();
206     synchronized (this) {
207       JobStatusUpdater updater = new JobStatusUpdater(timeout);
208       try {
209         updater.start();
210         wait();
211       } catch (InterruptedException e) {
212         logger.debug("Interrupted while waiting for job");
213       }
214     }
215     if (pollingException != null) {
216       if (pollingException instanceof JobCanceledException)
217         throw (JobCanceledException) pollingException;
218       throw new IllegalStateException(pollingException);
219     }
220     this.wakeWaiterJob();
221     return getStatus();
222   }
223 
224   /**
225    * Adds the job to the list of jobs to wait for. An {@link IllegalStateException} is thrown if the barrier has already
226    * been asked to wait for jobs by calling {@link #waitForJobs()}.
227    *
228    * @param job
229    *          the job
230    * @throws IllegalStateException
231    *           if the barrier already started waiting
232    */
233   public void addJob(Job job) throws IllegalStateException {
234     if (job == null)
235       throw new IllegalArgumentException("Job must not be null");
236     jobs.add(job);
237   }
238 
239   /**
240    * Sets the outcome of the various jobs that were monitored.
241    *
242    * @param status
243    *          the status
244    */
245   void setStatus(Result status) {
246     this.status = status;
247   }
248 
249   /**
250    * Returns the resulting status map.
251    *
252    * @return the status of the individual jobs
253    */
254   public Result getStatus() {
255     return status;
256   }
257 
258   /** Thread that keeps polling for status changes. */
259   class JobStatusUpdater extends Thread {
260     /** Maximum wait in milliseconds or 0 for unlimited waiting */
261     private final long workTime;
262 
263     /**
264      * Creates a new status updater that will wait for finished jobs. If <code>0</code> is passed in as the work time,
265      * the updater will wait as long as it takes. Otherwise, it will stop after the indicated amount of time has passed.
266      *
267      * @param workTime
268      *          the work time
269      */
270     JobStatusUpdater(long workTime) {
271       this.workTime = workTime;
272     }
273 
274     @Override
275     public void run() {
276       final long endTime = workTime > 0 ? System.currentTimeMillis() + workTime : 0;
277       final Map<Job, Job.Status> finishedJobs = new HashMap<Job, Job.Status>();
278       while (true) {
279         final long time = System.currentTimeMillis();
280         // Wait a little..
281         try {
282           final long timeToSleep = Math.min(pollingInterval, Math.abs(endTime - time));
283           Thread.sleep(timeToSleep);
284         } catch (InterruptedException e) {
285           logger.debug("Job polling thread was interrupted");
286           return;
287         }
288         // Look at all jobs and make sure all of them have reached the expected status
289         for (final Job job : jobs) {
290           // Don't ask if we already know
291           if (!finishedJobs.containsKey(job)) {
292             // Get the job status from the service registry
293             try {
294               final Job processedJob = serviceRegistry.getJob(job.getId());
295               final Job.Status jobStatus = processedJob.getStatus();
296               switch (jobStatus) {
297                 case CANCELLED:
298                   throw new JobCanceledException(processedJob);
299                 case DELETED:
300                 case FAILED:
301                 case FINISHED:
302                   job.setStatus(jobStatus);
303                   job.setPayload(processedJob.getPayload());
304                   finishedJobs.put(job, jobStatus);
305                   break;
306                 case PAUSED:
307                 case QUEUED:
308                 case RESTART:
309                 case DISPATCHING:
310                 case INSTANTIATED:
311                 case RUNNING:
312                   logger.trace("{} is still in the works", job);
313                   break;
314                 case WAITING:
315                   logger.trace("{} is waiting", job);
316                   break;
317                 default:
318                   logger.error("Unhandled job status '{}' found", jobStatus);
319                   break;
320               }
321             } catch (NotFoundException e) {
322               logger.warn("Error polling job {}: Not found!", job);
323               finishedJobs.put(job, Job.Status.DELETED);
324               pollingException = e;
325               break;
326             } catch (ServiceRegistryException e) {
327               logger.warn("Error polling service registry for the status of {}: {}", job, e.getMessage());
328             } catch (JobCanceledException e) {
329               logger.warn("Job {} got canceled", job);
330               pollingException = e;
331               updateAndNotify(finishedJobs);
332               return;
333             } catch (Throwable t) {
334               logger.error("An unexpected error occured while waiting for jobs", t);
335               pollingException = t;
336               updateAndNotify(finishedJobs);
337               return;
338             }
339           }
340 
341           // Are we done already?
342           if (finishedJobs.size() == jobs.size()) {
343             updateAndNotify(finishedJobs);
344             return;
345           } else if (workTime > 0 && endTime >= time) {
346             pollingException = new InterruptedException("Timeout waiting for job processing");
347             updateAndNotify(finishedJobs);
348             return;
349           }
350         }
351       }
352     }
353 
354     /**
355      * Notifies listeners about the status change.
356      *
357      * @param status
358      *          the status
359      */
360     private void updateAndNotify(Map<Job, Job.Status> status) {
361       JobBarrier.this.setStatus(new Result(status));
362       synchronized (JobBarrier.this) {
363         JobBarrier.this.notifyAll();
364       }
365     }
366 
367   }
368 
369   /** Result of a waiting operation on a certain number of jobs. */
370   public static class Result {
371     /** The outcome of this barrier */
372     private final Map<Job, Job.Status> status;
373 
374     /**
375      * Creates a new job barrier result.
376      *
377      * @param status
378      *          the barrier outcome
379      */
380     public Result(Map<Job, Job.Status> status) {
381       this.status = status;
382     }
383 
384     /**
385      * Returns the status details.
386      *
387      * @return the status details
388      */
389     public Map<Job, Job.Status> getStatus() {
390       return status;
391     }
392 
393     /**
394      * Returns <code>true</code> if all jobs are in the <code>{@link Job.Status#FINISHED}</code> state.
395      *
396      * @return <code>true</code> if all jobs are finished
397      */
398     public boolean isSuccess() {
399       for (final Job.Status state : status.values()) {
400         if (!state.equals(Job.Status.FINISHED))
401           return false;
402       }
403       return true;
404     }
405   }
406 }