View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.util;
23  
24  import static com.entwinemedia.fn.Stream.$;
25  import static org.opencastproject.util.data.Collections.map;
26  import static org.opencastproject.util.data.Collections.toArray;
27  import static org.opencastproject.util.data.Option.none;
28  import static org.opencastproject.util.data.Option.some;
29  import static org.opencastproject.util.data.Tuple.tuple;
30  
31  import org.opencastproject.job.api.Job;
32  import org.opencastproject.job.api.Job.Status;
33  import org.opencastproject.job.api.JobBarrier;
34  import org.opencastproject.job.api.JobParser;
35  import org.opencastproject.mediapackage.MediaPackageElement;
36  import org.opencastproject.mediapackage.MediaPackageElementParser;
37  import org.opencastproject.mediapackage.MediaPackageException;
38  import org.opencastproject.serviceregistry.api.ServiceRegistry;
39  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
40  import org.opencastproject.util.data.Function;
41  import org.opencastproject.util.data.Option;
42  
43  import com.entwinemedia.fn.Fn2;
44  import com.entwinemedia.fn.Pred;
45  import com.entwinemedia.fn.data.Opt;
46  
47  import org.apache.http.HttpResponse;
48  import org.slf4j.Logger;
49  import org.slf4j.LoggerFactory;
50  
51  import java.util.Collection;
52  import java.util.List;
53  
54  /** Job related utility functions. */
55  public final class JobUtil {
56    /** The logger */
57    private static final Logger logger = LoggerFactory.getLogger(JobUtil.class);
58  
59    private JobUtil() {
60    }
61  
62    /**
63     * Update the job from the service registry and get its payload.
64     *
65     * @return the payload or none, if either to job cannot be found or if the job has no or an empty payload
66     */
67    public static Opt<String> getPayload(ServiceRegistry reg, Job job)
68            throws NotFoundException, ServiceRegistryException {
69      for (Job updated : update(reg, job)) {
70        return Opt.nul(updated.getPayload());
71      }
72      return Opt.none();
73    }
74  
75    /**
76     * Get the latest state of a job. Does not modify the <code>job</code> parameter.
77     *
78     * @return the updated job or none, if it cannot be found
79     */
80    public static Opt<Job> update(ServiceRegistry reg, Job job) throws ServiceRegistryException {
81      try {
82        return Opt.some(reg.getJob(job.getId()));
83      } catch (NotFoundException e) {
84        return Opt.none();
85      }
86    }
87  
88    /**
89     * Waits for the result of a created barrier for <code>jobs</code>, using <code>registry</code> to poll for the
90     * outcome of the monitored jobs using the default polling interval. The
91     * <code>waiter</code> is the job which is waiting for the other jobs to finish.
92     *
93     * @param waiter
94     *          the job waiting for the other jobs to finish
95     * @param reg
96     *          the service registry
97     * @param pollingInterval
98     *          the time in miliseconds between two polling operations
99     * @param timeout
100    *          the maximum amount of time to wait
101    * @param jobs
102    *          the jobs to monitor
103    * @return the job barrier result
104    */
105   public static JobBarrier.Result waitForJobs(Job waiter, ServiceRegistry reg, long pollingInterval, long timeout,
106           Job... jobs) {
107     JobBarrier barrier = new JobBarrier(waiter, reg, pollingInterval, jobs);
108     return barrier.waitForJobs(timeout);
109   }
110 
111   /**
112    * Waits for the result of a created barrier for <code>jobs</code>, using <code>registry</code> to poll for the
113    * outcome of the monitored jobs using the default polling interval. The
114    * <code>waiter</code> is the job which is waiting for the other jobs to finish.
115    *
116    * @param waiter
117    *          the job waiting for the other jobs to finish
118    * @param reg
119    *          the service registry
120    * @param timeout
121    *          the maximum amount of time to wait
122    * @param jobs
123    *          the jobs to monitor
124    * @return the job barrier result
125    */
126   public static JobBarrier.Result waitForJobs(Job waiter, ServiceRegistry reg, long timeout, Job... jobs) {
127     return waitForJobs(waiter, reg, JobBarrier.DEFAULT_POLLING_INTERVAL, timeout, jobs);
128   }
129 
130   /**
131    * Waits for the result of a created barrier for <code>jobs</code>, using <code>registry</code> to poll for the
132    * outcome of the monitored jobs using the default polling interval. The
133    * <code>waiter</code> is the job which is waiting for the other jobs to finish.
134    *
135    * @param waiter
136    *          the job waiting for the other jobs to finish
137    * @param reg
138    *          the service registry
139    * @param jobs
140    *          the jobs to monitor
141    * @return the job barrier result
142    */
143   public static JobBarrier.Result waitForJobs(Job waiter, ServiceRegistry reg, Job... jobs) {
144     return waitForJobs(waiter, reg, 0L, jobs);
145   }
146 
147   /**
148    * Waits for the result of a created barrier for <code>jobs</code>, using <code>registry</code> to poll for the
149    * outcome of the monitored jobs using the default polling interval.
150    *
151    * @param reg
152    *          the service registry
153    * @param jobs
154    *          the jobs to monitor
155    * @return the job barrier result
156    */
157   public static JobBarrier.Result waitForJobs(ServiceRegistry reg, Job... jobs) {
158     return waitForJobs(null, reg, jobs);
159   }
160 
161   /**
162    * Waits for the result of a created barrier for <code>jobs</code>, using <code>registry</code> to poll for the
163    * outcome of the monitored jobs using the default polling interval. The
164    * <code>waiter</code> is the job which is waiting for the other jobs to finish.
165    *
166    * @param waiter
167    *          the job waiting for the other jobs to finish
168    * @param reg
169    *          the service registry
170    * @param jobs
171    *          the jobs to monitor
172    * @return the job barrier result
173    */
174   public static JobBarrier.Result waitForJobs(Job waiter, ServiceRegistry reg, Collection<Job> jobs) {
175     return waitForJobs(waiter, reg, toArray(Job.class, jobs));
176   }
177 
178   /**
179    * Waits for the result of a created barrier for <code>jobs</code>, using <code>registry</code> to poll for the
180    * outcome of the monitored jobs using the default polling interval.
181    *
182    * @param reg
183    *          the service registry
184    * @param jobs
185    *          the jobs to monitor
186    * @return the job barrier result
187    */
188   public static JobBarrier.Result waitForJobs(ServiceRegistry reg, Collection<Job> jobs) {
189     return waitForJobs(null, reg, jobs);
190   }
191 
192   /** Check if <code>job</code> is not done yet and wait in case. */
193   public static JobBarrier.Result waitForJob(Job waiter, ServiceRegistry reg, Option<Long> timeout, Job job) {
194     final Job.Status status = job.getStatus();
195     // only create a barrier if the job is not done yet
196     switch (status) {
197       case CANCELLED:
198       case DELETED:
199       case FAILED:
200       case FINISHED:
201         return new JobBarrier.Result(map(tuple(job, status)));
202       default:
203         for (Long t : timeout)
204           return waitForJobs(waiter, reg, t, job);
205         return waitForJobs(waiter, reg, job);
206     }
207   }
208 
209   /** Check if <code>job</code> is not done yet and wait in case. */
210   public static JobBarrier.Result waitForJob(ServiceRegistry reg, Option<Long> timeout, Job job) {
211     return waitForJob(null, reg, timeout, job);
212   }
213 
214   /**
215    * Check if <code>job</code> is not done yet and wait in case.
216    *
217    * @param waiter
218    *          the job waiting for the other jobs to finish
219    * @param reg
220    *          the service registry
221    * @param job
222    *          the job to monitor
223    * @return the job barrier result
224    */
225   public static JobBarrier.Result waitForJob(Job waiter, ServiceRegistry reg, Job job) {
226     return waitForJob(waiter, reg, none(0L), job);
227   }
228 
229   /** Check if <code>job</code> is not done yet and wait in case. */
230   public static JobBarrier.Result waitForJob(ServiceRegistry reg, Job job) {
231     return waitForJob(null, reg, none(0L), job);
232   }
233 
234   /**
235    * Returns <code>true</code> if the job is ready to be dispatched.
236    *
237    * @param job
238    *          the job
239    * @return <code>true</code> whether the job is ready to be dispatched
240    * @throws IllegalStateException
241    *           if the job status is unknown
242    */
243   public static boolean isReadyToDispatch(Job job) throws IllegalStateException {
244     switch (job.getStatus()) {
245       case CANCELLED:
246       case DELETED:
247       case FAILED:
248       case FINISHED:
249         return false;
250       case DISPATCHING:
251       case INSTANTIATED:
252       case PAUSED:
253       case QUEUED:
254       case RESTART:
255       case RUNNING:
256       case WAITING:
257         return true;
258       default:
259         throw new IllegalStateException("Found job in unknown state '" + job.getStatus() + "'");
260     }
261   }
262 
263   /** Wait for the job to complete and return the success value. */
264   public static Function<Job, Boolean> waitForJobSuccess(final Job waiter, final ServiceRegistry reg,
265           final Option<Long> timeout) {
266     return new Function<Job, Boolean>() {
267       @Override
268       public Boolean apply(Job job) {
269         return waitForJob(waiter, reg, timeout, job).isSuccess();
270       }
271     };
272   }
273 
274   /**
275    * Interpret the payload of a completed {@link Job} as a {@link MediaPackageElement}. Wait for the job to complete if
276    * necessary.
277    *
278    */
279   public static Function<Job, MediaPackageElement> payloadAsMediaPackageElement(final Job waiter,
280           final ServiceRegistry reg) {
281     return new Function.X<Job, MediaPackageElement>() {
282       @Override
283       public MediaPackageElement xapply(Job job) throws MediaPackageException {
284         waitForJob(waiter, reg, none(0L), job);
285         return MediaPackageElementParser.getFromXml(job.getPayload());
286       }
287     };
288   }
289 
290   /**
291    * Interpret the payload of a completed {@link Job} as a {@link MediaPackageElement}. Wait for the job to complete if
292    * necessary.
293    */
294   public static Function<Job, MediaPackageElement> payloadAsMediaPackageElement(final ServiceRegistry reg) {
295     return payloadAsMediaPackageElement(null, reg);
296   }
297 
298   public static final Function<HttpResponse, Option<Job>> jobFromHttpResponse = new Function<HttpResponse, Option<Job>>() {
299     @Override
300     public Option<Job> apply(HttpResponse response) {
301       try {
302         return some(JobParser.parseJob(response.getEntity().getContent()));
303       } catch (Exception e) {
304         logger.error("Error parsing Job from HTTP response", e);
305         return none();
306       }
307     }
308   };
309 
310   /** Sum up the queue time of a list of jobs. */
311   public static long sumQueueTime(List<Job> jobs) {
312     return $(jobs).foldl(0L, new Fn2<Long, Job, Long>() {
313       @Override
314       public Long apply(Long sum, Job job) {
315         return sum + job.getQueueTime();
316       }
317     });
318   }
319 
320   /** Get all jobs that are not in state {@link org.opencastproject.job.api.Job.Status#FINISHED}. */
321   public static List<Job> getNonFinished(List<Job> jobs) {
322     return $(jobs).filter(new Pred<Job>() {
323       @Override
324       public Boolean apply(Job job) {
325         return !job.getStatus().equals(Status.FINISHED);
326       }
327     }).toList();
328   }
329 
330 }