1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.util;
23
24 import static com.entwinemedia.fn.Stream.$;
25 import static org.opencastproject.util.data.Collections.map;
26 import static org.opencastproject.util.data.Collections.toArray;
27 import static org.opencastproject.util.data.Option.none;
28 import static org.opencastproject.util.data.Option.some;
29 import static org.opencastproject.util.data.Tuple.tuple;
30
31 import org.opencastproject.job.api.Job;
32 import org.opencastproject.job.api.Job.Status;
33 import org.opencastproject.job.api.JobBarrier;
34 import org.opencastproject.job.api.JobParser;
35 import org.opencastproject.mediapackage.MediaPackageElement;
36 import org.opencastproject.mediapackage.MediaPackageElementParser;
37 import org.opencastproject.mediapackage.MediaPackageException;
38 import org.opencastproject.serviceregistry.api.ServiceRegistry;
39 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
40 import org.opencastproject.util.data.Function;
41 import org.opencastproject.util.data.Option;
42
43 import com.entwinemedia.fn.Fn2;
44 import com.entwinemedia.fn.Pred;
45 import com.entwinemedia.fn.data.Opt;
46
47 import org.apache.http.HttpResponse;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import java.util.Collection;
52 import java.util.List;
53
54
55 public final class JobUtil {
56
57 private static final Logger logger = LoggerFactory.getLogger(JobUtil.class);
58
59 private JobUtil() {
60 }
61
62
63
64
65
66
67 public static Opt<String> getPayload(ServiceRegistry reg, Job job)
68 throws NotFoundException, ServiceRegistryException {
69 for (Job updated : update(reg, job)) {
70 return Opt.nul(updated.getPayload());
71 }
72 return Opt.none();
73 }
74
75
76
77
78
79
80 public static Opt<Job> update(ServiceRegistry reg, Job job) throws ServiceRegistryException {
81 try {
82 return Opt.some(reg.getJob(job.getId()));
83 } catch (NotFoundException e) {
84 return Opt.none();
85 }
86 }
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105 public static JobBarrier.Result waitForJobs(Job waiter, ServiceRegistry reg, long pollingInterval, long timeout,
106 Job... jobs) {
107 JobBarrier barrier = new JobBarrier(waiter, reg, pollingInterval, jobs);
108 return barrier.waitForJobs(timeout);
109 }
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126 public static JobBarrier.Result waitForJobs(Job waiter, ServiceRegistry reg, long timeout, Job... jobs) {
127 return waitForJobs(waiter, reg, JobBarrier.DEFAULT_POLLING_INTERVAL, timeout, jobs);
128 }
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143 public static JobBarrier.Result waitForJobs(Job waiter, ServiceRegistry reg, Job... jobs) {
144 return waitForJobs(waiter, reg, 0L, jobs);
145 }
146
147
148
149
150
151
152
153
154
155
156
157 public static JobBarrier.Result waitForJobs(ServiceRegistry reg, Job... jobs) {
158 return waitForJobs(null, reg, jobs);
159 }
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174 public static JobBarrier.Result waitForJobs(Job waiter, ServiceRegistry reg, Collection<Job> jobs) {
175 return waitForJobs(waiter, reg, toArray(Job.class, jobs));
176 }
177
178
179
180
181
182
183
184
185
186
187
188 public static JobBarrier.Result waitForJobs(ServiceRegistry reg, Collection<Job> jobs) {
189 return waitForJobs(null, reg, jobs);
190 }
191
192
193 public static JobBarrier.Result waitForJob(Job waiter, ServiceRegistry reg, Option<Long> timeout, Job job) {
194 final Job.Status status = job.getStatus();
195
196 switch (status) {
197 case CANCELLED:
198 case DELETED:
199 case FAILED:
200 case FINISHED:
201 return new JobBarrier.Result(map(tuple(job, status)));
202 default:
203 for (Long t : timeout)
204 return waitForJobs(waiter, reg, t, job);
205 return waitForJobs(waiter, reg, job);
206 }
207 }
208
209
210 public static JobBarrier.Result waitForJob(ServiceRegistry reg, Option<Long> timeout, Job job) {
211 return waitForJob(null, reg, timeout, job);
212 }
213
214
215
216
217
218
219
220
221
222
223
224
225 public static JobBarrier.Result waitForJob(Job waiter, ServiceRegistry reg, Job job) {
226 return waitForJob(waiter, reg, none(0L), job);
227 }
228
229
230 public static JobBarrier.Result waitForJob(ServiceRegistry reg, Job job) {
231 return waitForJob(null, reg, none(0L), job);
232 }
233
234
235
236
237
238
239
240
241
242
243 public static boolean isReadyToDispatch(Job job) throws IllegalStateException {
244 switch (job.getStatus()) {
245 case CANCELLED:
246 case DELETED:
247 case FAILED:
248 case FINISHED:
249 return false;
250 case DISPATCHING:
251 case INSTANTIATED:
252 case PAUSED:
253 case QUEUED:
254 case RESTART:
255 case RUNNING:
256 case WAITING:
257 return true;
258 default:
259 throw new IllegalStateException("Found job in unknown state '" + job.getStatus() + "'");
260 }
261 }
262
263
264 public static Function<Job, Boolean> waitForJobSuccess(final Job waiter, final ServiceRegistry reg,
265 final Option<Long> timeout) {
266 return new Function<Job, Boolean>() {
267 @Override
268 public Boolean apply(Job job) {
269 return waitForJob(waiter, reg, timeout, job).isSuccess();
270 }
271 };
272 }
273
274
275
276
277
278
279 public static Function<Job, MediaPackageElement> payloadAsMediaPackageElement(final Job waiter,
280 final ServiceRegistry reg) {
281 return new Function.X<Job, MediaPackageElement>() {
282 @Override
283 public MediaPackageElement xapply(Job job) throws MediaPackageException {
284 waitForJob(waiter, reg, none(0L), job);
285 return MediaPackageElementParser.getFromXml(job.getPayload());
286 }
287 };
288 }
289
290
291
292
293
294 public static Function<Job, MediaPackageElement> payloadAsMediaPackageElement(final ServiceRegistry reg) {
295 return payloadAsMediaPackageElement(null, reg);
296 }
297
298 public static final Function<HttpResponse, Option<Job>> jobFromHttpResponse = new Function<HttpResponse, Option<Job>>() {
299 @Override
300 public Option<Job> apply(HttpResponse response) {
301 try {
302 return some(JobParser.parseJob(response.getEntity().getContent()));
303 } catch (Exception e) {
304 logger.error("Error parsing Job from HTTP response", e);
305 return none();
306 }
307 }
308 };
309
310
311 public static long sumQueueTime(List<Job> jobs) {
312 return $(jobs).foldl(0L, new Fn2<Long, Job, Long>() {
313 @Override
314 public Long apply(Long sum, Job job) {
315 return sum + job.getQueueTime();
316 }
317 });
318 }
319
320
321 public static List<Job> getNonFinished(List<Job> jobs) {
322 return $(jobs).filter(new Pred<Job>() {
323 @Override
324 public Boolean apply(Job job) {
325 return !job.getStatus().equals(Status.FINISHED);
326 }
327 }).toList();
328 }
329
330 }