1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.serviceregistry.impl;
23
24 import static java.nio.charset.StandardCharsets.UTF_8;
25 import static org.opencastproject.db.Queries.namedQuery;
26 import static org.opencastproject.security.api.SecurityConstants.ORGANIZATION_HEADER;
27 import static org.opencastproject.security.api.SecurityConstants.USER_HEADER;
28
29 import org.opencastproject.db.DBSession;
30 import org.opencastproject.db.DBSessionFactory;
31 import org.opencastproject.job.api.Job;
32 import org.opencastproject.job.jpa.JpaJob;
33 import org.opencastproject.security.api.Organization;
34 import org.opencastproject.security.api.OrganizationDirectoryService;
35 import org.opencastproject.security.api.SecurityService;
36 import org.opencastproject.security.api.TrustedHttpClient;
37 import org.opencastproject.security.api.TrustedHttpClientException;
38 import org.opencastproject.security.api.User;
39 import org.opencastproject.security.api.UserDirectoryService;
40 import org.opencastproject.serviceregistry.api.HostRegistration;
41 import org.opencastproject.serviceregistry.api.ServiceRegistration;
42 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
43 import org.opencastproject.serviceregistry.api.SystemLoad;
44 import org.opencastproject.serviceregistry.impl.jpa.ServiceRegistrationJpaImpl;
45 import org.opencastproject.util.NotFoundException;
46 import org.opencastproject.util.UrlSupport;
47
48 import org.apache.commons.io.IOUtils;
49 import org.apache.commons.lang3.StringUtils;
50 import org.apache.commons.lang3.tuple.Pair;
51 import org.apache.http.HttpResponse;
52 import org.apache.http.HttpStatus;
53 import org.apache.http.client.entity.UrlEncodedFormEntity;
54 import org.apache.http.client.methods.HttpPost;
55 import org.apache.http.message.BasicNameValuePair;
56 import org.osgi.service.cm.ConfigurationException;
57 import org.osgi.service.component.ComponentContext;
58 import org.osgi.service.component.annotations.Activate;
59 import org.osgi.service.component.annotations.Component;
60 import org.osgi.service.component.annotations.Deactivate;
61 import org.osgi.service.component.annotations.Modified;
62 import org.osgi.service.component.annotations.Reference;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 import java.io.IOException;
67 import java.util.ArrayList;
68 import java.util.Collections;
69 import java.util.Comparator;
70 import java.util.Dictionary;
71 import java.util.HashMap;
72 import java.util.HashSet;
73 import java.util.List;
74 import java.util.Map;
75 import java.util.Set;
76 import java.util.concurrent.Executors;
77 import java.util.concurrent.ScheduledFuture;
78 import java.util.concurrent.ScheduledThreadPoolExecutor;
79 import java.util.concurrent.TimeUnit;
80 import java.util.function.Function;
81 import java.util.stream.Collectors;
82
83 import javax.persistence.EntityManager;
84 import javax.persistence.EntityManagerFactory;
85
86
87
88
89
90 @Component(
91 property = {
92 "service.description=Job Dispatcher"
93 },
94 immediate = true,
95 service = { JobDispatcher.class }
96 )
97 public class JobDispatcher {
98
99
100 public static final String PERSISTENCE_UNIT = "org.opencastproject.common";
101
102
103 protected static final String OPT_DISPATCHINTERVAL = "dispatch.interval";
104
105
106 static final float MIN_DISPATCH_INTERVAL = 1.0F;
107
108
109 static final float DEFAULT_DISPATCH_INTERVAL = 0.0F;
110
111
112 static final long DISPATCH_INTERVAL_MS_FACTOR = 1000;
113
114
115 static final long DEFAULT_HEART_BEAT = 60;
116
117 private static final Logger logger = LoggerFactory.getLogger(JobDispatcher.class);
118
119 private ServiceRegistryJpaImpl serviceRegistry;
120
121 private OrganizationDirectoryService organizationDirectoryService;
122 private UserDirectoryService userDirectoryService;
123 private SecurityService securityService;
124 private TrustedHttpClient client;
125
126
127 protected ScheduledThreadPoolExecutor scheduledExecutor = null;
128
129
130 private EntityManagerFactory emf = null;
131
132 protected DBSessionFactory dbSessionFactory;
133
134 protected DBSession db;
135
136 private ScheduledFuture jdfuture = null;
137
138
139
140
141 private List<String> undispatchableJobTypes = null;
142
143
144 protected final Map<Long, String> dispatchPriorityList = new HashMap<>();
145
146
147 @Reference(target = "(osgi.unit.name=org.opencastproject.common)")
148 void setEntityManagerFactory(EntityManagerFactory emf) {
149 this.emf = emf;
150 }
151
152
153 @Reference
154 public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
155 this.dbSessionFactory = dbSessionFactory;
156 }
157
158
159 @Reference()
160 void setServiceRegistry(ServiceRegistryJpaImpl sr) {
161 this.serviceRegistry = sr;
162 }
163
164 @Reference()
165 void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
166 this.organizationDirectoryService = organizationDirectoryService;
167 }
168
169 @Reference
170 void setUserDirectoryService(UserDirectoryService svc) {
171 this.userDirectoryService = svc;
172 }
173
174 @Reference
175 void setSecurityService(SecurityService sec) {
176 this.securityService = sec;
177 }
178
179 @Reference
180 void setTrustedHttpClient(TrustedHttpClient client) {
181 this.client = client;
182 }
183
184 @Activate
185 public void activate(ComponentContext cc) throws ConfigurationException {
186 logger.info("Activate job dispatcher");
187 db = dbSessionFactory.createSession(emf);
188 scheduledExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
189 scheduledExecutor.setRemoveOnCancelPolicy(true);
190 logger.info("Activated");
191 updated(cc.getProperties());
192 }
193
194
195 @Modified
196 public void modified(ComponentContext cc) throws ConfigurationException {
197 logger.debug("Modified in job dispatcher");
198 updated(cc.getProperties());
199 }
200
201
202
203
204
205
206 @SuppressWarnings("rawtypes")
207 public void updated(Dictionary properties) {
208
209 logger.info("Updating job dispatcher properties");
210
211 float dispatchInterval = DEFAULT_DISPATCH_INTERVAL;
212 String dispatchIntervalString = StringUtils.trimToNull((String) properties.get(OPT_DISPATCHINTERVAL));
213 if (StringUtils.isNotBlank(dispatchIntervalString)) {
214 try {
215 dispatchInterval = Float.parseFloat(dispatchIntervalString);
216 } catch (Exception e) {
217 logger.warn("Dispatch interval '{}' is malformed, setting to {}", dispatchIntervalString,
218 MIN_DISPATCH_INTERVAL);
219 dispatchInterval = MIN_DISPATCH_INTERVAL;
220 }
221 if (dispatchInterval == 0) {
222 logger.info("Dispatching disabled");
223 } else if (dispatchInterval < MIN_DISPATCH_INTERVAL) {
224 logger.warn("Dispatch interval {} seconds is too low, adjusting to {}", dispatchInterval,
225 MIN_DISPATCH_INTERVAL);
226 dispatchInterval = MIN_DISPATCH_INTERVAL;
227 } else {
228 logger.info("Dispatch interval set to {} seconds", dispatchInterval);
229 }
230 }
231
232
233 if (jdfuture != null) {
234 jdfuture.cancel(true);
235 }
236
237
238 if (dispatchInterval > 0) {
239 long dispatchIntervalMs = Math.round(dispatchInterval * DISPATCH_INTERVAL_MS_FACTOR);
240 logger.info("Job dispatching is enabled");
241 logger.debug("Starting job dispatching at a custom interval of {}s", dispatchInterval);
242 jdfuture = scheduledExecutor.scheduleWithFixedDelay(getJobDispatcherRunnable(), dispatchIntervalMs,
243 dispatchIntervalMs, TimeUnit.MILLISECONDS);
244
245 serviceRegistry.startHeartbeat(DEFAULT_HEART_BEAT);
246 } else {
247 logger.info("Job dispatching is disabled");
248 }
249 }
250
251 @Deactivate
252 public void deactivate() {
253 logger.info("Deactivate Job Dispatcher");
254
255
256 if (scheduledExecutor != null) {
257 try {
258 scheduledExecutor.shutdownNow();
259 if (!scheduledExecutor.isShutdown()) {
260 logger.info("Waiting for Job Dispatcher to terminate");
261 scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS);
262 }
263 } catch (InterruptedException e) {
264 logger.error("Error shutting down the Job Dispatcher", e);
265 }
266 }
267 }
268
269 Runnable getJobDispatcherRunnable() {
270 return new JobDispatcherRunner();
271 }
272
273 public class JobDispatcherRunner implements Runnable {
274
275
276
277
278
279
280 @Override
281 public void run() {
282 logger.debug("Starting job dispatch");
283
284 undispatchableJobTypes = new ArrayList<>();
285 try {
286 if (!dispatchPriorityList.isEmpty()) {
287 logger.trace("Checking for outdated jobs in dispatchPriorityList's '{}' jobs", dispatchPriorityList.size());
288
289 List<Long> jobIds = db.exec(getDispatchableJobsWithIdFilterQuery(dispatchPriorityList.keySet()));
290 for (Long jobId : new HashSet<>(dispatchPriorityList.keySet())) {
291 if (!jobIds.contains(jobId)) {
292 logger.debug("Removing outdated dispatchPriorityList job '{}'", jobId);
293 dispatchPriorityList.remove(jobId);
294 }
295 }
296 }
297
298 int jobsOffset = 0;
299 List<JpaJob> dispatchableJobs;
300 List<JpaJob> workflowJobs = new ArrayList<>();
301 boolean jobsFound;
302 do {
303
304 dispatchableJobs = db.exec(serviceRegistry.getDispatchableJobsWithStatusQuery(
305 jobsOffset, ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT, Job.Status.RESTART
306 ));
307 jobsOffset += ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT;
308 jobsFound = !dispatchableJobs.isEmpty();
309
310
311 for (JpaJob job : dispatchableJobs) {
312 if (ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())) {
313 workflowJobs.add(job);
314 }
315 }
316 if (dispatchableJobs.removeAll(workflowJobs) && dispatchableJobs.isEmpty()) {
317 continue;
318 }
319
320 dispatchDispatchableJobs(dispatchableJobs);
321 } while (jobsFound);
322
323 jobsOffset = 0;
324 do {
325
326 dispatchableJobs = db.exec(serviceRegistry.getDispatchableJobsWithStatusQuery(
327 jobsOffset, ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT, Job.Status.QUEUED
328 ));
329 jobsOffset += ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT;
330 jobsFound = !dispatchableJobs.isEmpty();
331
332
333 for (JpaJob job : dispatchableJobs) {
334 if (ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())) {
335 workflowJobs.add(job);
336 }
337 }
338 if (dispatchableJobs.removeAll(workflowJobs) && dispatchableJobs.isEmpty()) {
339 continue;
340 }
341
342 dispatchDispatchableJobs(dispatchableJobs);
343 } while (jobsFound);
344
345 if (!workflowJobs.isEmpty()) {
346 dispatchDispatchableJobs(workflowJobs);
347 }
348 } catch (Throwable t) {
349 logger.warn("Error dispatching jobs", t);
350 } finally {
351 undispatchableJobTypes = null;
352 }
353
354 logger.debug("Finished job dispatch");
355 }
356
357
358
359
360
361
362 private void dispatchDispatchableJobs(List<JpaJob> jobsToDispatch) {
363
364 SystemLoad systemLoad = db.exec(serviceRegistry.getHostLoadsQuery());
365
366 for (JpaJob job : jobsToDispatch) {
367
368 String jobType = job.getJobType();
369
370
371 String jobSignature = jobType + '@' + job.getOperation();
372 if (undispatchableJobTypes.contains(jobSignature) && !dispatchPriorityList.containsKey(job.getId())) {
373 logger.trace("Skipping dispatching of {} with type '{}' for this round of dispatching", job, jobType);
374 continue;
375 }
376
377
378 String creator = job.getCreator();
379 String creatorOrganization = job.getOrganization();
380
381
382 Organization organization;
383 try {
384 organization = organizationDirectoryService.getOrganization(creatorOrganization);
385 securityService.setOrganization(organization);
386 } catch (NotFoundException e) {
387 logger.debug("Skipping dispatching of job for non-existing organization '{}'", creatorOrganization);
388 continue;
389 }
390
391
392 User user = userDirectoryService.loadUser(creator);
393 if (user == null) {
394 logger.warn("Unable to dispatch {}: creator '{}' is not available", job, creator);
395 continue;
396 }
397 securityService.setUser(user);
398
399
400 try {
401 List<ServiceRegistration> services = db.exec(serviceRegistry.getServiceRegistrationsQuery());
402 List<HostRegistration> hosts = db.exec(serviceRegistry.getHostRegistrationsQuery()).stream()
403 .filter(host -> !dispatchPriorityList.containsValue(host.getBaseUrl())
404 || host.getBaseUrl().equals(dispatchPriorityList.get(job.getId())))
405 .collect(Collectors.toList());
406 List<ServiceRegistration> candidateServices;
407
408
409
410
411 Job parentJob = null;
412 try {
413 if (job.getParentJob() != null) {
414 parentJob = serviceRegistry.getJob(job.getParentJob().getId());
415 }
416 } catch (NotFoundException e) {
417
418 }
419
420
421
422 boolean parentHasRunningChildren = false;
423 if (parentJob != null) {
424 for (Job child : serviceRegistry.getChildJobs(parentJob.getId())) {
425 if (Job.Status.RUNNING.equals(child.getStatus())) {
426 parentHasRunningChildren = true;
427 break;
428 }
429 }
430 }
431
432
433
434 if (parentJob == null || ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(jobType) || parentHasRunningChildren) {
435 logger.trace("Using available capacity only for dispatching of {} to a service of type '{}'", job, jobType);
436 candidateServices = serviceRegistry.getServiceRegistrationsWithCapacity(jobType, services, hosts,
437 systemLoad);
438 } else {
439 logger.trace("Using full list of services for dispatching of {} to a service of type '{}'", job, jobType);
440 candidateServices = serviceRegistry.getServiceRegistrationsByLoad(jobType, services, hosts, systemLoad);
441 }
442
443
444 String hostAcceptingJob;
445 try {
446 hostAcceptingJob = dispatchJob(job, candidateServices);
447 try {
448 systemLoad.updateNodeLoad(hostAcceptingJob, job.getJobLoad());
449 } catch (NotFoundException e) {
450 logger.info("Host {} not found in load list, cannot dispatch {} to it", hostAcceptingJob, job);
451 }
452
453 dispatchPriorityList.remove(job.getId());
454 } catch (ServiceUnavailableException e) {
455 logger.debug("Jobs of type {} currently cannot be dispatched", job.getOperation());
456
457 if (!ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(jobType)) {
458 undispatchableJobTypes.add(jobSignature);
459 }
460 continue;
461 } catch (UndispatchableJobException e) {
462 logger.debug("{} currently cannot be dispatched", job);
463 continue;
464 }
465
466 logger.debug("{} dispatched to {}", job, hostAcceptingJob);
467 } catch (ServiceRegistryException e) {
468 Throwable cause = (e.getCause() != null) ? e.getCause() : e;
469 logger.error("Error dispatching {}", job, cause);
470 } finally {
471 securityService.setUser(null);
472 securityService.setOrganization(null);
473 }
474 }
475 }
476
477
478
479
480
481
482
483
484
485
486
487
488
489 private String dispatchJob(JpaJob job, List<ServiceRegistration> services)
490 throws ServiceRegistryException, ServiceUnavailableException, UndispatchableJobException {
491 if (services.size() == 0) {
492 logger.debug("No service is currently available to handle jobs of type '" + job.getJobType() + "'");
493 throw new ServiceUnavailableException("No service of type " + job.getJobType() + " available");
494 }
495
496
497 job.setStatus(Job.Status.DISPATCHING);
498
499 boolean triedDispatching = false;
500 boolean jobLoadExceedsMaximumLoads = false;
501
502 final Float highestMaxLoad = services.stream()
503 .map(s -> ((ServiceRegistrationJpaImpl) s).getHostRegistration())
504 .map(HostRegistration::getMaxLoad)
505 .max(Comparator.naturalOrder())
506 .get();
507
508 if (job.getJobLoad() > highestMaxLoad) {
509
510
511 jobLoadExceedsMaximumLoads = true;
512 }
513
514 for (ServiceRegistration registration : services) {
515 job.setProcessorServiceRegistration((ServiceRegistrationJpaImpl) registration);
516
517
518
519
520 if (jobLoadExceedsMaximumLoads
521 && job.getProcessorServiceRegistration().getHostRegistration().getMaxLoad() != highestMaxLoad) {
522 continue;
523 }
524
525 try {
526 job = serviceRegistry.updateInternal(job);
527 } catch (Exception e) {
528
529
530
531 logger.debug("Unable to dispatch {}. This is likely caused by another service registry dispatching the job",
532 job);
533 throw new UndispatchableJobException(job + " is already being dispatched");
534 }
535
536 triedDispatching = true;
537
538 String serviceUrl = UrlSupport.concat(registration.getHost(), registration.getPath(), "dispatch");
539 HttpPost post = new HttpPost(serviceUrl);
540
541
542 post.addHeader(ORGANIZATION_HEADER, securityService.getOrganization().getId());
543 post.addHeader(USER_HEADER, securityService.getUser().getUsername());
544
545 List<BasicNameValuePair> params = new ArrayList<>();
546 params.add(new BasicNameValuePair("id", Long.toString(job.getId())));
547 params.add(new BasicNameValuePair("operation", job.getOperation()));
548 post.setEntity(new UrlEncodedFormEntity(params, UTF_8));
549
550
551 HttpResponse response = null;
552 int responseStatusCode;
553 try {
554 logger.debug("Trying to dispatch {} type '{}' load {} to {}", job, job.getJobType(), job.getJobLoad(),
555 registration.getHost());
556 if (!ServiceRegistryJpaImpl.START_WORKFLOW.equals(job.getOperation())) {
557 serviceRegistry.setCurrentJob(job.toJob());
558 }
559 response = client.execute(post);
560 responseStatusCode = response.getStatusLine().getStatusCode();
561 if (responseStatusCode == HttpStatus.SC_NO_CONTENT) {
562 return registration.getHost();
563 } else if (responseStatusCode == HttpStatus.SC_SERVICE_UNAVAILABLE) {
564 logger.debug("Service {} is currently refusing to accept jobs of type {}", registration,
565 job.getOperation());
566 continue;
567 } else if (responseStatusCode == HttpStatus.SC_PRECONDITION_FAILED) {
568 job.setStatus(Job.Status.FAILED);
569 job = serviceRegistry.updateJob(job);
570 logger.debug("Service {} refused to accept {}", registration, job);
571 throw new UndispatchableJobException(IOUtils.toString(response.getEntity().getContent()));
572 } else if (responseStatusCode == HttpStatus.SC_METHOD_NOT_ALLOWED) {
573 logger.debug("Service {} is not yet reachable", registration);
574 continue;
575 } else {
576 logger.warn("Service {} failed ({}) accepting {}", registration, responseStatusCode, job);
577 continue;
578 }
579 } catch (UndispatchableJobException e) {
580 throw e;
581 } catch (TrustedHttpClientException e) {
582
583 logger.warn("Unable to dispatch {}", job, e);
584 continue;
585 } catch (Exception e) {
586 logger.warn("Unable to dispatch {}", job, e);
587 } finally {
588 try {
589 client.close(response);
590 } catch (IOException e) {
591
592 }
593 serviceRegistry.setCurrentJob(null);
594 }
595 }
596
597
598 if (triedDispatching) {
599
600
601
602 if (serviceRegistry.acceptJobLoadsExeedingMaxLoad && !dispatchPriorityList.containsKey(job.getId())
603 && !ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())
604 && job.getProcessorServiceRegistration() != null) {
605 String host = job.getProcessorServiceRegistration().getHost();
606 logger.debug("About to add {} to dispatchPriorityList with processor host {}", job, host);
607 dispatchPriorityList.put(job.getId(), host);
608 }
609
610 try {
611 job.setStatus(Job.Status.QUEUED);
612 job.setProcessorServiceRegistration(null);
613 job = serviceRegistry.updateJob(job);
614 } catch (Exception e) {
615 logger.error("Unable to put {} back into queue", job, e);
616 }
617 }
618
619 logger.debug("Unable to dispatch {}, no service is currently ready to accept the job", job);
620 throw new UndispatchableJobException(job + " is currently undispatchable");
621 }
622
623
624
625
626
627
628
629 protected Function<EntityManager, List<Long>> getDispatchableJobsWithIdFilterQuery(Set<Long> jobIds) {
630 return em -> {
631 if (jobIds == null || jobIds.isEmpty()) {
632 return Collections.emptyList();
633 }
634
635 return namedQuery.findAll(
636 "Job.dispatchable.status.idfilter",
637 Long.class,
638 Pair.of("jobids", dispatchPriorityList.keySet()),
639 Pair.of("statuses", List.of(
640 Job.Status.RESTART.ordinal(),
641 Job.Status.QUEUED.ordinal()
642 ))
643 ).apply(em);
644 };
645 }
646
647 private final Function<ServiceRegistration, HostRegistration> toHostRegistration =
648 new Function<ServiceRegistration, HostRegistration>() {
649 @Override
650 public HostRegistration apply(ServiceRegistration s) {
651 return ((ServiceRegistrationJpaImpl) s).getHostRegistration();
652 }
653 };
654
655 private final Function<HostRegistration, Float> toMaxLoad = new Function<HostRegistration, Float>() {
656 @Override
657 public Float apply(HostRegistration h) {
658 return h.getMaxLoad();
659 }
660 };
661
662 private final Comparator<Float> sortFloatValuesDesc = new Comparator<Float>() {
663 @Override
664 public int compare(Float o1, Float o2) {
665 return o2.compareTo(o1);
666 }
667 };
668 }
669 }