1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.job.api;
23
24 import org.opencastproject.job.api.Job.Status;
25 import org.opencastproject.serviceregistry.api.ServiceRegistry;
26 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
27 import org.opencastproject.util.JobCanceledException;
28 import org.opencastproject.util.NotFoundException;
29
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.HashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Optional;
39
40
41
42
43
44
45
46
47
48 public final class JobBarrier {
49
50 private static final Logger logger = LoggerFactory.getLogger(JobBarrier.class);
51
52
53 public static final long DEFAULT_POLLING_INTERVAL = 5000L;
54
55
56 private final ServiceRegistry serviceRegistry;
57
58
59 private final long pollingInterval;
60
61
62 private final Optional<Long> waiterJobId;
63
64
65 private final List<Job> jobs;
66
67
68 private volatile Throwable pollingException = null;
69
70
71 private volatile Result status = null;
72
73
74
75
76
77
78
79
80
81
82
83 public JobBarrier(Job waiter, ServiceRegistry registry) {
84 this(waiter, registry, DEFAULT_POLLING_INTERVAL, new Job[] {});
85 }
86
87
88
89
90
91
92
93
94
95
96
97
98
99 public JobBarrier(Job waiter, ServiceRegistry registry, Job... jobs) {
100 this(waiter, registry, DEFAULT_POLLING_INTERVAL, jobs);
101 }
102
103
104
105
106
107
108
109
110
111
112
113
114 public JobBarrier(Job waiter, ServiceRegistry registry, long pollingInterval) {
115 this(waiter, registry, pollingInterval, new Job[] {});
116 }
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 public JobBarrier(Job waiter, ServiceRegistry registry, long pollingInterval, Job... jobs) {
132 if (registry == null) {
133 throw new IllegalArgumentException("Service registry must not be null");
134 }
135 if (jobs == null) {
136 throw new IllegalArgumentException("Jobs must not be null");
137 }
138 if (pollingInterval < 0) {
139 throw new IllegalArgumentException("Polling interval must be a positive number");
140 }
141 this.serviceRegistry = registry;
142 this.pollingInterval = pollingInterval;
143 if (waiter != null) {
144 this.waiterJobId = Optional.of(waiter.getId());
145 } else {
146 this.waiterJobId = Optional.empty();
147 }
148 this.jobs = new ArrayList<Job>(Arrays.asList(jobs));
149 }
150
151 private void suspendWaiterJob() {
152 if (this.waiterJobId.isPresent()) {
153 try {
154 final Job waiter = serviceRegistry.getJob(waiterJobId.get());
155 waiter.setStatus(Job.Status.WAITING);
156 logger.debug("Job {} set to WAITING state.", waiter.getId());
157 this.serviceRegistry.updateJob(waiter);
158 } catch (ServiceRegistryException e) {
159 logger.warn("Unable to put {} into a waiting state, this may cause a deadlock: {}",
160 waiterJobId, e.getMessage());
161 } catch (NotFoundException e) {
162 logger.warn("Unable to put {} into a waiting state, job not found by the service registry. "
163 + "This may cause a deadlock: {}", waiterJobId, e.getMessage());
164 }
165 } else {
166 logger.debug("No waiting job set, unable to put waiting job into waiting state");
167 }
168 }
169
170 private void wakeWaiterJob() {
171 if (this.waiterJobId.isPresent()) {
172 try {
173 final Job waiter = serviceRegistry.getJob(waiterJobId.get());
174 waiter.setStatus(Job.Status.RUNNING);
175 logger.debug("Job {} wakened and set back to RUNNING state.", waiter.getId());
176 this.serviceRegistry.updateJob(waiter);
177 } catch (ServiceRegistryException e) {
178 logger.warn("Unable to put {} into a waiting state, this may cause a deadlock: {}",
179 waiterJobId, e.getMessage());
180 } catch (NotFoundException e) {
181 logger.warn("Unable to put {} into a waiting state, job not found by the service registry. "
182 + "This may cause a deadlock: {}", waiterJobId, e.getMessage());
183 }
184 } else {
185 logger.debug("No waiting job set, unable to put waiting job into waiting state");
186 }
187 }
188
189
190
191
192
193
194 public Result waitForJobs() {
195 return waitForJobs(0);
196 }
197
198
199
200
201
202
203
204
205
206
207
208
209
210 public Result waitForJobs(long timeout) throws JobCanceledException, IllegalStateException {
211 if (jobs.size() == 0) {
212 return new Result(new HashMap<Job, Status>());
213 }
214 this.suspendWaiterJob();
215 synchronized (this) {
216 JobStatusUpdater updater = new JobStatusUpdater(timeout);
217 try {
218 updater.start();
219 wait();
220 } catch (InterruptedException e) {
221 logger.debug("Interrupted while waiting for job");
222 }
223 }
224 if (pollingException != null) {
225 if (pollingException instanceof JobCanceledException) {
226 throw (JobCanceledException) pollingException;
227 }
228 throw new IllegalStateException(pollingException);
229 }
230 this.wakeWaiterJob();
231 return getStatus();
232 }
233
234
235
236
237
238
239
240
241
242
243 public void addJob(Job job) throws IllegalStateException {
244 if (job == null) {
245 throw new IllegalArgumentException("Job must not be null");
246 }
247 jobs.add(job);
248 }
249
250
251
252
253
254
255
256 void setStatus(Result status) {
257 this.status = status;
258 }
259
260
261
262
263
264
265 public Result getStatus() {
266 return status;
267 }
268
269
270 class JobStatusUpdater extends Thread {
271
272 private final long workTime;
273
274
275
276
277
278
279
280
281 JobStatusUpdater(long workTime) {
282 this.workTime = workTime;
283 }
284
285 @Override
286 public void run() {
287 final long endTime = workTime > 0 ? System.currentTimeMillis() + workTime : 0;
288 final Map<Job, Job.Status> finishedJobs = new HashMap<Job, Job.Status>();
289 while (true) {
290 final long time = System.currentTimeMillis();
291
292 try {
293 final long timeToSleep = Math.min(pollingInterval, Math.abs(endTime - time));
294 Thread.sleep(timeToSleep);
295 } catch (InterruptedException e) {
296 logger.debug("Job polling thread was interrupted");
297 return;
298 }
299
300 for (final Job job : jobs) {
301
302 if (!finishedJobs.containsKey(job)) {
303
304 try {
305 final Job processedJob = serviceRegistry.getJob(job.getId());
306 final Job.Status jobStatus = processedJob.getStatus();
307 switch (jobStatus) {
308 case CANCELLED:
309 throw new JobCanceledException(processedJob);
310 case DELETED:
311 case FAILED:
312 case FINISHED:
313 job.setStatus(jobStatus);
314 job.setPayload(processedJob.getPayload());
315 finishedJobs.put(job, jobStatus);
316 break;
317 case PAUSED:
318 case QUEUED:
319 case RESTART:
320 case DISPATCHING:
321 case INSTANTIATED:
322 case RUNNING:
323 logger.trace("{} is still in the works", job);
324 break;
325 case WAITING:
326 logger.trace("{} is waiting", job);
327 break;
328 default:
329 logger.error("Unhandled job status '{}' found", jobStatus);
330 break;
331 }
332 } catch (NotFoundException e) {
333 logger.warn("Error polling job {}: Not found!", job);
334 finishedJobs.put(job, Job.Status.DELETED);
335 pollingException = e;
336 break;
337 } catch (ServiceRegistryException e) {
338 logger.warn("Error polling service registry for the status of {}: {}", job, e.getMessage());
339 } catch (JobCanceledException e) {
340 logger.warn("Job {} got canceled", job);
341 pollingException = e;
342 updateAndNotify(finishedJobs);
343 return;
344 } catch (Throwable t) {
345 logger.error("An unexpected error occured while waiting for jobs", t);
346 pollingException = t;
347 updateAndNotify(finishedJobs);
348 return;
349 }
350 }
351
352
353 if (finishedJobs.size() == jobs.size()) {
354 updateAndNotify(finishedJobs);
355 return;
356 } else if (workTime > 0 && endTime >= time) {
357 pollingException = new InterruptedException("Timeout waiting for job processing");
358 updateAndNotify(finishedJobs);
359 return;
360 }
361 }
362 }
363 }
364
365
366
367
368
369
370
371 private void updateAndNotify(Map<Job, Job.Status> status) {
372 JobBarrier.this.setStatus(new Result(status));
373 synchronized (JobBarrier.this) {
374 JobBarrier.this.notifyAll();
375 }
376 }
377
378 }
379
380
381 public static class Result {
382
383 private final Map<Job, Job.Status> status;
384
385
386
387
388
389
390
391 public Result(Map<Job, Job.Status> status) {
392 this.status = status;
393 }
394
395
396
397
398
399
400 public Map<Job, Job.Status> getStatus() {
401 return status;
402 }
403
404
405
406
407
408
409 public boolean isSuccess() {
410 for (final Job.Status state : status.values()) {
411 if (!state.equals(Job.Status.FINISHED)) {
412 return false;
413 }
414 }
415 return true;
416 }
417 }
418 }