1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.job.api;
23
24 import org.opencastproject.job.api.Job.Status;
25 import org.opencastproject.serviceregistry.api.ServiceRegistry;
26 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
27 import org.opencastproject.util.JobCanceledException;
28 import org.opencastproject.util.NotFoundException;
29
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.HashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Optional;
39
40
41
42
43
44
45
46
47
48 public final class JobBarrier {
49
50 private static final Logger logger = LoggerFactory.getLogger(JobBarrier.class);
51
52
53 public static final long DEFAULT_POLLING_INTERVAL = 5000L;
54
55
56 private final ServiceRegistry serviceRegistry;
57
58
59 private final long pollingInterval;
60
61
62 private final Optional<Long> waiterJobId;
63
64
65 private final List<Job> jobs;
66
67
68 private volatile Throwable pollingException = null;
69
70
71 private volatile Result status = null;
72
73
74
75
76
77
78
79
80
81
82
83 public JobBarrier(Job waiter, ServiceRegistry registry) {
84 this(waiter, registry, DEFAULT_POLLING_INTERVAL, new Job[] {});
85 }
86
87
88
89
90
91
92
93
94
95
96
97
98
99 public JobBarrier(Job waiter, ServiceRegistry registry, Job... jobs) {
100 this(waiter, registry, DEFAULT_POLLING_INTERVAL, jobs);
101 }
102
103
104
105
106
107
108
109
110
111
112
113
114 public JobBarrier(Job waiter, ServiceRegistry registry, long pollingInterval) {
115 this(waiter, registry, pollingInterval, new Job[] {});
116 }
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 public JobBarrier(Job waiter, ServiceRegistry registry, long pollingInterval, Job... jobs) {
132 if (registry == null)
133 throw new IllegalArgumentException("Service registry must not be null");
134 if (jobs == null)
135 throw new IllegalArgumentException("Jobs must not be null");
136 if (pollingInterval < 0)
137 throw new IllegalArgumentException("Polling interval must be a positive number");
138 this.serviceRegistry = registry;
139 this.pollingInterval = pollingInterval;
140 if (waiter != null)
141 this.waiterJobId = Optional.of(waiter.getId());
142 else
143 this.waiterJobId = Optional.empty();
144 this.jobs = new ArrayList<Job>(Arrays.asList(jobs));
145 }
146
147 private void suspendWaiterJob() {
148 if (this.waiterJobId.isPresent()) {
149 try {
150 final Job waiter = serviceRegistry.getJob(waiterJobId.get());
151 waiter.setStatus(Job.Status.WAITING);
152 logger.debug("Job {} set to WAITING state.", waiter.getId());
153 this.serviceRegistry.updateJob(waiter);
154 } catch (ServiceRegistryException e) {
155 logger.warn("Unable to put {} into a waiting state, this may cause a deadlock: {}", waiterJobId, e.getMessage());
156 } catch (NotFoundException e) {
157 logger.warn("Unable to put {} into a waiting state, job not found by the service registry. This may cause a deadlock: {}", waiterJobId, e.getMessage());
158 }
159 } else {
160 logger.debug("No waiting job set, unable to put waiting job into waiting state");
161 }
162 }
163
164 private void wakeWaiterJob() {
165 if (this.waiterJobId.isPresent()) {
166 try {
167 final Job waiter = serviceRegistry.getJob(waiterJobId.get());
168 waiter.setStatus(Job.Status.RUNNING);
169 logger.debug("Job {} wakened and set back to RUNNING state.", waiter.getId());
170 this.serviceRegistry.updateJob(waiter);
171 } catch (ServiceRegistryException e) {
172 logger.warn("Unable to put {} into a waiting state, this may cause a deadlock: {}", waiterJobId, e.getMessage());
173 } catch (NotFoundException e) {
174 logger.warn("Unable to put {} into a waiting state, job not found by the service registry. This may cause a deadlock: {}", waiterJobId, e.getMessage());
175 }
176 } else {
177 logger.debug("No waiting job set, unable to put waiting job into waiting state");
178 }
179 }
180
181
182
183
184
185
186 public Result waitForJobs() {
187 return waitForJobs(0);
188 }
189
190
191
192
193
194
195
196
197
198
199
200
201
202 public Result waitForJobs(long timeout) throws JobCanceledException, IllegalStateException {
203 if (jobs.size() == 0)
204 return new Result(new HashMap<Job, Status>());
205 this.suspendWaiterJob();
206 synchronized (this) {
207 JobStatusUpdater updater = new JobStatusUpdater(timeout);
208 try {
209 updater.start();
210 wait();
211 } catch (InterruptedException e) {
212 logger.debug("Interrupted while waiting for job");
213 }
214 }
215 if (pollingException != null) {
216 if (pollingException instanceof JobCanceledException)
217 throw (JobCanceledException) pollingException;
218 throw new IllegalStateException(pollingException);
219 }
220 this.wakeWaiterJob();
221 return getStatus();
222 }
223
224
225
226
227
228
229
230
231
232
233 public void addJob(Job job) throws IllegalStateException {
234 if (job == null)
235 throw new IllegalArgumentException("Job must not be null");
236 jobs.add(job);
237 }
238
239
240
241
242
243
244
245 void setStatus(Result status) {
246 this.status = status;
247 }
248
249
250
251
252
253
254 public Result getStatus() {
255 return status;
256 }
257
258
259 class JobStatusUpdater extends Thread {
260
261 private final long workTime;
262
263
264
265
266
267
268
269
270 JobStatusUpdater(long workTime) {
271 this.workTime = workTime;
272 }
273
274 @Override
275 public void run() {
276 final long endTime = workTime > 0 ? System.currentTimeMillis() + workTime : 0;
277 final Map<Job, Job.Status> finishedJobs = new HashMap<Job, Job.Status>();
278 while (true) {
279 final long time = System.currentTimeMillis();
280
281 try {
282 final long timeToSleep = Math.min(pollingInterval, Math.abs(endTime - time));
283 Thread.sleep(timeToSleep);
284 } catch (InterruptedException e) {
285 logger.debug("Job polling thread was interrupted");
286 return;
287 }
288
289 for (final Job job : jobs) {
290
291 if (!finishedJobs.containsKey(job)) {
292
293 try {
294 final Job processedJob = serviceRegistry.getJob(job.getId());
295 final Job.Status jobStatus = processedJob.getStatus();
296 switch (jobStatus) {
297 case CANCELLED:
298 throw new JobCanceledException(processedJob);
299 case DELETED:
300 case FAILED:
301 case FINISHED:
302 job.setStatus(jobStatus);
303 job.setPayload(processedJob.getPayload());
304 finishedJobs.put(job, jobStatus);
305 break;
306 case PAUSED:
307 case QUEUED:
308 case RESTART:
309 case DISPATCHING:
310 case INSTANTIATED:
311 case RUNNING:
312 logger.trace("{} is still in the works", job);
313 break;
314 case WAITING:
315 logger.trace("{} is waiting", job);
316 break;
317 default:
318 logger.error("Unhandled job status '{}' found", jobStatus);
319 break;
320 }
321 } catch (NotFoundException e) {
322 logger.warn("Error polling job {}: Not found!", job);
323 finishedJobs.put(job, Job.Status.DELETED);
324 pollingException = e;
325 break;
326 } catch (ServiceRegistryException e) {
327 logger.warn("Error polling service registry for the status of {}: {}", job, e.getMessage());
328 } catch (JobCanceledException e) {
329 logger.warn("Job {} got canceled", job);
330 pollingException = e;
331 updateAndNotify(finishedJobs);
332 return;
333 } catch (Throwable t) {
334 logger.error("An unexpected error occured while waiting for jobs", t);
335 pollingException = t;
336 updateAndNotify(finishedJobs);
337 return;
338 }
339 }
340
341
342 if (finishedJobs.size() == jobs.size()) {
343 updateAndNotify(finishedJobs);
344 return;
345 } else if (workTime > 0 && endTime >= time) {
346 pollingException = new InterruptedException("Timeout waiting for job processing");
347 updateAndNotify(finishedJobs);
348 return;
349 }
350 }
351 }
352 }
353
354
355
356
357
358
359
360 private void updateAndNotify(Map<Job, Job.Status> status) {
361 JobBarrier.this.setStatus(new Result(status));
362 synchronized (JobBarrier.this) {
363 JobBarrier.this.notifyAll();
364 }
365 }
366
367 }
368
369
370 public static class Result {
371
372 private final Map<Job, Job.Status> status;
373
374
375
376
377
378
379
380 public Result(Map<Job, Job.Status> status) {
381 this.status = status;
382 }
383
384
385
386
387
388
389 public Map<Job, Job.Status> getStatus() {
390 return status;
391 }
392
393
394
395
396
397
398 public boolean isSuccess() {
399 for (final Job.Status state : status.values()) {
400 if (!state.equals(Job.Status.FINISHED))
401 return false;
402 }
403 return true;
404 }
405 }
406 }