View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.job.api;
23  
24  import org.opencastproject.job.api.Job.Status;
25  import org.opencastproject.serviceregistry.api.ServiceRegistry;
26  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
27  import org.opencastproject.util.JobCanceledException;
28  import org.opencastproject.util.NotFoundException;
29  
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.HashMap;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Optional;
39  
40  /**
41   * This class is a utility implementation that will wait for all given jobs to change their status to either one of:
42   * <ul>
43   * <li>{@link Job.Status#FINISHED}</li>
44   * <li>{@link Job.Status#FAILED}</li>
45   * <li>{@link Job.Status#DELETED}</li>
46   * </ul>
47   */
48  public final class JobBarrier {
49    /** The logging facility */
50    private static final Logger logger = LoggerFactory.getLogger(JobBarrier.class);
51  
52    /** Default polling interval is 5 seconds */
53    public static final long DEFAULT_POLLING_INTERVAL = 5000L;
54  
55    /** The service registry used to do the polling */
56    private final ServiceRegistry serviceRegistry;
57  
58    /** Time in milliseconds between two pools for the job status */
59    private final long pollingInterval;
60  
61    /** The job that's waiting */
62    private final Optional<Long> waiterJobId;
63  
64    /** The jobs to wait on */
65    private final List<Job> jobs;
66  
67    /** An exception that might have been thrown while polling */
68    private volatile Throwable pollingException = null;
69  
70    /** The status map */
71    private volatile Result status = null;
72  
73    /**
74     * Creates a barrier without any jobs, using <code>registry</code> to poll for the outcome of the monitored jobs using
75     * the default polling interval {@link #DEFAULT_POLLING_INTERVAL}. The <code>waiter</code> is the job which is waiting
76     * for the other jobs to finish. Use {@link #addJob(Job)} to add jobs to monitor.
77     *
78     * @param registry
79     *          the registry
80     * @param waiter
81     *          the job waiting for the other jobs to finish
82     */
83    public JobBarrier(Job waiter, ServiceRegistry registry) {
84      this(waiter, registry, DEFAULT_POLLING_INTERVAL, new Job[] {});
85    }
86  
87    /**
88     * Creates a barrier for <code>jobs</code>, using <code>registry</code> to poll for the outcome of the monitored jobs
89     * using the default polling interval {@link #DEFAULT_POLLING_INTERVAL}. The <code>waiter</code> is the job which is
90     * waiting for the other jobs to finish.
91     *
92     * @param registry
93     *          the registry
94     * @param jobs
95     *          the jobs to monitor
96     * @param waiter
97     *          the job waiting for the other jobs to finish
98     */
99    public JobBarrier(Job waiter, ServiceRegistry registry, Job... jobs) {
100     this(waiter, registry, DEFAULT_POLLING_INTERVAL, jobs);
101   }
102 
103   /**
104    * Creates a wrapper for <code>job</code>, using <code>registry</code> to poll for the job outcome. The
105    * <code>waiter</code> is the job which is waiting for the other jobs to finish.
106    *
107    * @param registry
108    *          the registry
109    * @param pollingInterval
110    *          the time in miliseconds between two polling operations
111    * @param waiter
112    *          the job waiting for the other jobs to finish
113    */
114   public JobBarrier(Job waiter, ServiceRegistry registry, long pollingInterval) {
115     this(waiter, registry, pollingInterval, new Job[] {});
116   }
117 
118   /**
119    * Creates a wrapper for <code>job</code>, using <code>registry</code> to poll for the job outcome. The
120    * <code>waiter</code> is the job which is waiting for the other jobs to finish.
121    *
122    * @param jobs
123    *          the job to poll
124    * @param registry
125    *          the registry
126    * @param pollingInterval
127    *          the time in miliseconds between two polling operations
128    * @param waiter
129    *          the job waiting for the other jobs to finish
130    */
131   public JobBarrier(Job waiter, ServiceRegistry registry, long pollingInterval, Job... jobs) {
132     if (registry == null) {
133       throw new IllegalArgumentException("Service registry must not be null");
134     }
135     if (jobs == null) {
136       throw new IllegalArgumentException("Jobs must not be null");
137     }
138     if (pollingInterval < 0) {
139       throw new IllegalArgumentException("Polling interval must be a positive number");
140     }
141     this.serviceRegistry = registry;
142     this.pollingInterval = pollingInterval;
143     if (waiter != null) {
144       this.waiterJobId = Optional.of(waiter.getId());
145     } else {
146       this.waiterJobId = Optional.empty();
147     }
148     this.jobs = new ArrayList<Job>(Arrays.asList(jobs));
149   }
150 
151   private void suspendWaiterJob() {
152     if (this.waiterJobId.isPresent()) {
153       try {
154         final Job waiter = serviceRegistry.getJob(waiterJobId.get());
155         waiter.setStatus(Job.Status.WAITING);
156         logger.debug("Job {} set to WAITING state.", waiter.getId());
157         this.serviceRegistry.updateJob(waiter);
158       } catch (ServiceRegistryException e) {
159         logger.warn("Unable to put {} into a waiting state, this may cause a deadlock: {}",
160             waiterJobId, e.getMessage());
161       } catch (NotFoundException e) {
162         logger.warn("Unable to put {} into a waiting state, job not found by the service registry. "
163             + "This may cause a deadlock: {}", waiterJobId, e.getMessage());
164       }
165     } else {
166       logger.debug("No waiting job set, unable to put waiting job into waiting state");
167     }
168   }
169 
170   private void wakeWaiterJob() {
171     if (this.waiterJobId.isPresent()) {
172       try {
173         final Job waiter = serviceRegistry.getJob(waiterJobId.get());
174         waiter.setStatus(Job.Status.RUNNING);
175         logger.debug("Job {} wakened and set back to RUNNING state.", waiter.getId());
176         this.serviceRegistry.updateJob(waiter);
177       } catch (ServiceRegistryException e) {
178         logger.warn("Unable to put {} into a waiting state, this may cause a deadlock: {}",
179             waiterJobId, e.getMessage());
180       } catch (NotFoundException e) {
181         logger.warn("Unable to put {} into a waiting state, job not found by the service registry. "
182             + "This may cause a deadlock: {}", waiterJobId, e.getMessage());
183       }
184     } else {
185       logger.debug("No waiting job set, unable to put waiting job into waiting state");
186     }
187   }
188 
189   /**
190    * Waits for a status change and returns the new status.
191    *
192    * @return the status
193    */
194   public Result waitForJobs() {
195     return waitForJobs(0);
196   }
197 
198   /**
199    * Waits for a status change on all jobs and returns. If waiting for the status exceeds a certain limit, the method
200    * returns even if some or all of the jobs are not yet finished. The same is true if at least one of the jobs fails or
201    * gets stopped or deleted.
202    *
203    * @param timeout
204    *          the maximum amount of time to wait
205    * @throws IllegalStateException
206    *           if there are no jobs to wait for
207    * @throws JobCanceledException
208    *           if one of the jobs was canceled
209    */
210   public Result waitForJobs(long timeout) throws JobCanceledException, IllegalStateException {
211     if (jobs.size() == 0) {
212       return new Result(new HashMap<Job, Status>());
213     }
214     this.suspendWaiterJob();
215     synchronized (this) {
216       JobStatusUpdater updater = new JobStatusUpdater(timeout);
217       try {
218         updater.start();
219         wait();
220       } catch (InterruptedException e) {
221         logger.debug("Interrupted while waiting for job");
222       }
223     }
224     if (pollingException != null) {
225       if (pollingException instanceof JobCanceledException) {
226         throw (JobCanceledException) pollingException;
227       }
228       throw new IllegalStateException(pollingException);
229     }
230     this.wakeWaiterJob();
231     return getStatus();
232   }
233 
234   /**
235    * Adds the job to the list of jobs to wait for. An {@link IllegalStateException} is thrown if the barrier has already
236    * been asked to wait for jobs by calling {@link #waitForJobs()}.
237    *
238    * @param job
239    *          the job
240    * @throws IllegalStateException
241    *           if the barrier already started waiting
242    */
243   public void addJob(Job job) throws IllegalStateException {
244     if (job == null) {
245       throw new IllegalArgumentException("Job must not be null");
246     }
247     jobs.add(job);
248   }
249 
250   /**
251    * Sets the outcome of the various jobs that were monitored.
252    *
253    * @param status
254    *          the status
255    */
256   void setStatus(Result status) {
257     this.status = status;
258   }
259 
260   /**
261    * Returns the resulting status map.
262    *
263    * @return the status of the individual jobs
264    */
265   public Result getStatus() {
266     return status;
267   }
268 
269   /** Thread that keeps polling for status changes. */
270   class JobStatusUpdater extends Thread {
271     /** Maximum wait in milliseconds or 0 for unlimited waiting */
272     private final long workTime;
273 
274     /**
275      * Creates a new status updater that will wait for finished jobs. If <code>0</code> is passed in as the work time,
276      * the updater will wait as long as it takes. Otherwise, it will stop after the indicated amount of time has passed.
277      *
278      * @param workTime
279      *          the work time
280      */
281     JobStatusUpdater(long workTime) {
282       this.workTime = workTime;
283     }
284 
285     @Override
286     public void run() {
287       final long endTime = workTime > 0 ? System.currentTimeMillis() + workTime : 0;
288       final Map<Job, Job.Status> finishedJobs = new HashMap<Job, Job.Status>();
289       while (true) {
290         final long time = System.currentTimeMillis();
291         // Wait a little..
292         try {
293           final long timeToSleep = Math.min(pollingInterval, Math.abs(endTime - time));
294           Thread.sleep(timeToSleep);
295         } catch (InterruptedException e) {
296           logger.debug("Job polling thread was interrupted");
297           return;
298         }
299         // Look at all jobs and make sure all of them have reached the expected status
300         for (final Job job : jobs) {
301           // Don't ask if we already know
302           if (!finishedJobs.containsKey(job)) {
303             // Get the job status from the service registry
304             try {
305               final Job processedJob = serviceRegistry.getJob(job.getId());
306               final Job.Status jobStatus = processedJob.getStatus();
307               switch (jobStatus) {
308                 case CANCELLED:
309                   throw new JobCanceledException(processedJob);
310                 case DELETED:
311                 case FAILED:
312                 case FINISHED:
313                   job.setStatus(jobStatus);
314                   job.setPayload(processedJob.getPayload());
315                   finishedJobs.put(job, jobStatus);
316                   break;
317                 case PAUSED:
318                 case QUEUED:
319                 case RESTART:
320                 case DISPATCHING:
321                 case INSTANTIATED:
322                 case RUNNING:
323                   logger.trace("{} is still in the works", job);
324                   break;
325                 case WAITING:
326                   logger.trace("{} is waiting", job);
327                   break;
328                 default:
329                   logger.error("Unhandled job status '{}' found", jobStatus);
330                   break;
331               }
332             } catch (NotFoundException e) {
333               logger.warn("Error polling job {}: Not found!", job);
334               finishedJobs.put(job, Job.Status.DELETED);
335               pollingException = e;
336               break;
337             } catch (ServiceRegistryException e) {
338               logger.warn("Error polling service registry for the status of {}: {}", job, e.getMessage());
339             } catch (JobCanceledException e) {
340               logger.warn("Job {} got canceled", job);
341               pollingException = e;
342               updateAndNotify(finishedJobs);
343               return;
344             } catch (Throwable t) {
345               logger.error("An unexpected error occured while waiting for jobs", t);
346               pollingException = t;
347               updateAndNotify(finishedJobs);
348               return;
349             }
350           }
351 
352           // Are we done already?
353           if (finishedJobs.size() == jobs.size()) {
354             updateAndNotify(finishedJobs);
355             return;
356           } else if (workTime > 0 && endTime >= time) {
357             pollingException = new InterruptedException("Timeout waiting for job processing");
358             updateAndNotify(finishedJobs);
359             return;
360           }
361         }
362       }
363     }
364 
365     /**
366      * Notifies listeners about the status change.
367      *
368      * @param status
369      *          the status
370      */
371     private void updateAndNotify(Map<Job, Job.Status> status) {
372       JobBarrier.this.setStatus(new Result(status));
373       synchronized (JobBarrier.this) {
374         JobBarrier.this.notifyAll();
375       }
376     }
377 
378   }
379 
380   /** Result of a waiting operation on a certain number of jobs. */
381   public static class Result {
382     /** The outcome of this barrier */
383     private final Map<Job, Job.Status> status;
384 
385     /**
386      * Creates a new job barrier result.
387      *
388      * @param status
389      *          the barrier outcome
390      */
391     public Result(Map<Job, Job.Status> status) {
392       this.status = status;
393     }
394 
395     /**
396      * Returns the status details.
397      *
398      * @return the status details
399      */
400     public Map<Job, Job.Status> getStatus() {
401       return status;
402     }
403 
404     /**
405      * Returns <code>true</code> if all jobs are in the <code>{@link Job.Status#FINISHED}</code> state.
406      *
407      * @return <code>true</code> if all jobs are finished
408      */
409     public boolean isSuccess() {
410       for (final Job.Status state : status.values()) {
411         if (!state.equals(Job.Status.FINISHED)) {
412           return false;
413         }
414       }
415       return true;
416     }
417   }
418 }