1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.job.api;
23
24 import org.opencastproject.job.api.Job.Status;
25 import org.opencastproject.serviceregistry.api.ServiceRegistry;
26 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
27 import org.opencastproject.util.JobCanceledException;
28 import org.opencastproject.util.NotFoundException;
29
30 import com.entwinemedia.fn.data.Opt;
31
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import java.util.ArrayList;
36 import java.util.Arrays;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.Map;
40
41
42
43
44
45
46
47
48
49 public final class JobBarrier {
50
51 private static final Logger logger = LoggerFactory.getLogger(JobBarrier.class);
52
53
54 public static final long DEFAULT_POLLING_INTERVAL = 5000L;
55
56
57 private final ServiceRegistry serviceRegistry;
58
59
60 private final long pollingInterval;
61
62
63 private final Opt<Long> waiterJobId;
64
65
66 private final List<Job> jobs;
67
68
69 private volatile Throwable pollingException = null;
70
71
72 private volatile Result status = null;
73
74
75
76
77
78
79
80
81
82
83
84 public JobBarrier(Job waiter, ServiceRegistry registry) {
85 this(waiter, registry, DEFAULT_POLLING_INTERVAL, new Job[] {});
86 }
87
88
89
90
91
92
93
94
95
96
97
98
99
100 public JobBarrier(Job waiter, ServiceRegistry registry, Job... jobs) {
101 this(waiter, registry, DEFAULT_POLLING_INTERVAL, jobs);
102 }
103
104
105
106
107
108
109
110
111
112
113
114
115 public JobBarrier(Job waiter, ServiceRegistry registry, long pollingInterval) {
116 this(waiter, registry, pollingInterval, new Job[] {});
117 }
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132 public JobBarrier(Job waiter, ServiceRegistry registry, long pollingInterval, Job... jobs) {
133 if (registry == null)
134 throw new IllegalArgumentException("Service registry must not be null");
135 if (jobs == null)
136 throw new IllegalArgumentException("Jobs must not be null");
137 if (pollingInterval < 0)
138 throw new IllegalArgumentException("Polling interval must be a positive number");
139 this.serviceRegistry = registry;
140 this.pollingInterval = pollingInterval;
141 if (waiter != null)
142 this.waiterJobId = Opt.some(waiter.getId());
143 else
144 this.waiterJobId = Opt.none();
145 this.jobs = new ArrayList<Job>(Arrays.asList(jobs));
146 }
147
148 private void suspendWaiterJob() {
149 if (this.waiterJobId.isSome()) {
150 try {
151 final Job waiter = serviceRegistry.getJob(waiterJobId.get());
152 waiter.setStatus(Job.Status.WAITING);
153 logger.debug("Job {} set to WAITING state.", waiter.getId());
154 this.serviceRegistry.updateJob(waiter);
155 } catch (ServiceRegistryException e) {
156 logger.warn("Unable to put {} into a waiting state, this may cause a deadlock: {}", waiterJobId, e.getMessage());
157 } catch (NotFoundException e) {
158 logger.warn("Unable to put {} into a waiting state, job not found by the service registry. This may cause a deadlock: {}", waiterJobId, e.getMessage());
159 }
160 } else {
161 logger.debug("No waiting job set, unable to put waiting job into waiting state");
162 }
163 }
164
165 private void wakeWaiterJob() {
166 if (this.waiterJobId.isSome()) {
167 try {
168 final Job waiter = serviceRegistry.getJob(waiterJobId.get());
169 waiter.setStatus(Job.Status.RUNNING);
170 logger.debug("Job {} wakened and set back to RUNNING state.", waiter.getId());
171 this.serviceRegistry.updateJob(waiter);
172 } catch (ServiceRegistryException e) {
173 logger.warn("Unable to put {} into a waiting state, this may cause a deadlock: {}", waiterJobId, e.getMessage());
174 } catch (NotFoundException e) {
175 logger.warn("Unable to put {} into a waiting state, job not found by the service registry. This may cause a deadlock: {}", waiterJobId, e.getMessage());
176 }
177 } else {
178 logger.debug("No waiting job set, unable to put waiting job into waiting state");
179 }
180 }
181
182
183
184
185
186
187 public Result waitForJobs() {
188 return waitForJobs(0);
189 }
190
191
192
193
194
195
196
197
198
199
200
201
202
203 public Result waitForJobs(long timeout) throws JobCanceledException, IllegalStateException {
204 if (jobs.size() == 0)
205 return new Result(new HashMap<Job, Status>());
206 this.suspendWaiterJob();
207 synchronized (this) {
208 JobStatusUpdater updater = new JobStatusUpdater(timeout);
209 try {
210 updater.start();
211 wait();
212 } catch (InterruptedException e) {
213 logger.debug("Interrupted while waiting for job");
214 }
215 }
216 if (pollingException != null) {
217 if (pollingException instanceof JobCanceledException)
218 throw (JobCanceledException) pollingException;
219 throw new IllegalStateException(pollingException);
220 }
221 this.wakeWaiterJob();
222 return getStatus();
223 }
224
225
226
227
228
229
230
231
232
233
234 public void addJob(Job job) throws IllegalStateException {
235 if (job == null)
236 throw new IllegalArgumentException("Job must not be null");
237 jobs.add(job);
238 }
239
240
241
242
243
244
245
246 void setStatus(Result status) {
247 this.status = status;
248 }
249
250
251
252
253
254
255 public Result getStatus() {
256 return status;
257 }
258
259
260 class JobStatusUpdater extends Thread {
261
262 private final long workTime;
263
264
265
266
267
268
269
270
271 JobStatusUpdater(long workTime) {
272 this.workTime = workTime;
273 }
274
275 @Override
276 public void run() {
277 final long endTime = workTime > 0 ? System.currentTimeMillis() + workTime : 0;
278 final Map<Job, Job.Status> finishedJobs = new HashMap<Job, Job.Status>();
279 while (true) {
280 final long time = System.currentTimeMillis();
281
282 try {
283 final long timeToSleep = Math.min(pollingInterval, Math.abs(endTime - time));
284 Thread.sleep(timeToSleep);
285 } catch (InterruptedException e) {
286 logger.debug("Job polling thread was interrupted");
287 return;
288 }
289
290 for (final Job job : jobs) {
291
292 if (!finishedJobs.containsKey(job)) {
293
294 try {
295 final Job processedJob = serviceRegistry.getJob(job.getId());
296 final Job.Status jobStatus = processedJob.getStatus();
297 switch (jobStatus) {
298 case CANCELLED:
299 throw new JobCanceledException(processedJob);
300 case DELETED:
301 case FAILED:
302 case FINISHED:
303 job.setStatus(jobStatus);
304 job.setPayload(processedJob.getPayload());
305 finishedJobs.put(job, jobStatus);
306 break;
307 case PAUSED:
308 case QUEUED:
309 case RESTART:
310 case DISPATCHING:
311 case INSTANTIATED:
312 case RUNNING:
313 logger.trace("{} is still in the works", job);
314 break;
315 case WAITING:
316 logger.trace("{} is waiting", job);
317 break;
318 default:
319 logger.error("Unhandled job status '{}' found", jobStatus);
320 break;
321 }
322 } catch (NotFoundException e) {
323 logger.warn("Error polling job {}: Not found!", job);
324 finishedJobs.put(job, Job.Status.DELETED);
325 pollingException = e;
326 break;
327 } catch (ServiceRegistryException e) {
328 logger.warn("Error polling service registry for the status of {}: {}", job, e.getMessage());
329 } catch (JobCanceledException e) {
330 logger.warn("Job {} got canceled", job);
331 pollingException = e;
332 updateAndNotify(finishedJobs);
333 return;
334 } catch (Throwable t) {
335 logger.error("An unexpected error occured while waiting for jobs", t);
336 pollingException = t;
337 updateAndNotify(finishedJobs);
338 return;
339 }
340 }
341
342
343 if (finishedJobs.size() == jobs.size()) {
344 updateAndNotify(finishedJobs);
345 return;
346 } else if (workTime > 0 && endTime >= time) {
347 pollingException = new InterruptedException("Timeout waiting for job processing");
348 updateAndNotify(finishedJobs);
349 return;
350 }
351 }
352 }
353 }
354
355
356
357
358
359
360
361 private void updateAndNotify(Map<Job, Job.Status> status) {
362 JobBarrier.this.setStatus(new Result(status));
363 synchronized (JobBarrier.this) {
364 JobBarrier.this.notifyAll();
365 }
366 }
367
368 }
369
370
371 public static class Result {
372
373 private final Map<Job, Job.Status> status;
374
375
376
377
378
379
380
381 public Result(Map<Job, Job.Status> status) {
382 this.status = status;
383 }
384
385
386
387
388
389
390 public Map<Job, Job.Status> getStatus() {
391 return status;
392 }
393
394
395
396
397
398
399 public boolean isSuccess() {
400 for (final Job.Status state : status.values()) {
401 if (!state.equals(Job.Status.FINISHED))
402 return false;
403 }
404 return true;
405 }
406 }
407 }