1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.serviceregistry.impl;
23
24 import static java.nio.charset.StandardCharsets.UTF_8;
25 import static org.opencastproject.db.Queries.namedQuery;
26 import static org.opencastproject.security.api.SecurityConstants.ORGANIZATION_HEADER;
27 import static org.opencastproject.security.api.SecurityConstants.USER_HEADER;
28
29 import org.opencastproject.db.DBSession;
30 import org.opencastproject.db.DBSessionFactory;
31 import org.opencastproject.job.api.Job;
32 import org.opencastproject.job.jpa.JpaJob;
33 import org.opencastproject.security.api.Organization;
34 import org.opencastproject.security.api.OrganizationDirectoryService;
35 import org.opencastproject.security.api.SecurityService;
36 import org.opencastproject.security.api.TrustedHttpClient;
37 import org.opencastproject.security.api.TrustedHttpClientException;
38 import org.opencastproject.security.api.User;
39 import org.opencastproject.security.api.UserDirectoryService;
40 import org.opencastproject.serviceregistry.api.HostRegistration;
41 import org.opencastproject.serviceregistry.api.ServiceRegistration;
42 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
43 import org.opencastproject.serviceregistry.api.SystemLoad;
44 import org.opencastproject.serviceregistry.impl.jpa.ServiceRegistrationJpaImpl;
45 import org.opencastproject.util.NotFoundException;
46 import org.opencastproject.util.UrlSupport;
47
48 import org.apache.commons.io.IOUtils;
49 import org.apache.commons.lang3.StringUtils;
50 import org.apache.commons.lang3.tuple.Pair;
51 import org.apache.http.HttpResponse;
52 import org.apache.http.HttpStatus;
53 import org.apache.http.client.entity.UrlEncodedFormEntity;
54 import org.apache.http.client.methods.HttpPost;
55 import org.apache.http.message.BasicNameValuePair;
56 import org.osgi.service.cm.ConfigurationException;
57 import org.osgi.service.component.ComponentContext;
58 import org.osgi.service.component.annotations.Activate;
59 import org.osgi.service.component.annotations.Component;
60 import org.osgi.service.component.annotations.Modified;
61 import org.osgi.service.component.annotations.Reference;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 import java.io.IOException;
66 import java.util.ArrayList;
67 import java.util.Collections;
68 import java.util.Comparator;
69 import java.util.Dictionary;
70 import java.util.HashMap;
71 import java.util.HashSet;
72 import java.util.List;
73 import java.util.Map;
74 import java.util.Set;
75 import java.util.concurrent.Executors;
76 import java.util.concurrent.ScheduledFuture;
77 import java.util.concurrent.ScheduledThreadPoolExecutor;
78 import java.util.concurrent.TimeUnit;
79 import java.util.function.Function;
80 import java.util.stream.Collectors;
81
82 import javax.persistence.EntityManager;
83 import javax.persistence.EntityManagerFactory;
84
85
86
87
88
89 @Component(
90 property = {
91 "service.description=Job Dispatcher"
92 },
93 immediate = true,
94 service = { JobDispatcher.class }
95 )
96 public class JobDispatcher {
97
98
99 public static final String PERSISTENCE_UNIT = "org.opencastproject.common";
100
101
102 protected static final String OPT_DISPATCHINTERVAL = "dispatch.interval";
103
104
105 static final float MIN_DISPATCH_INTERVAL = 1.0F;
106
107
108 static final float DEFAULT_DISPATCH_INTERVAL = 0.0F;
109
110
111 static final long DISPATCH_INTERVAL_MS_FACTOR = 1000;
112
113 private static final Logger logger = LoggerFactory.getLogger(JobDispatcher.class);
114
115 private ServiceRegistryJpaImpl serviceRegistry;
116
117 private OrganizationDirectoryService organizationDirectoryService;
118 private UserDirectoryService userDirectoryService;
119 private SecurityService securityService;
120 private TrustedHttpClient client;
121
122
123 protected ScheduledThreadPoolExecutor scheduledExecutor = null;
124
125
126 private EntityManagerFactory emf = null;
127
128 protected DBSessionFactory dbSessionFactory;
129
130 protected DBSession db;
131
132 private ScheduledFuture jdfuture = null;
133
134
135
136
137 private List<String> undispatchableJobTypes = null;
138
139
140 protected final Map<Long, String> dispatchPriorityList = new HashMap<>();
141
142
143 @Reference(target = "(osgi.unit.name=org.opencastproject.common)")
144 void setEntityManagerFactory(EntityManagerFactory emf) {
145 this.emf = emf;
146 }
147
148
149 @Reference
150 public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
151 this.dbSessionFactory = dbSessionFactory;
152 }
153
154
155 @Reference()
156 void setServiceRegistry(ServiceRegistryJpaImpl sr) {
157 this.serviceRegistry = sr;
158 }
159
160 @Reference()
161 void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
162 this.organizationDirectoryService = organizationDirectoryService;
163 }
164
165 @Reference
166 void setUserDirectoryService(UserDirectoryService svc) {
167 this.userDirectoryService = svc;
168 }
169
170 @Reference
171 void setSecurityService(SecurityService sec) {
172 this.securityService = sec;
173 }
174
175 @Reference
176 void setTrustedHttpClient(TrustedHttpClient client) {
177 this.client = client;
178 }
179
180 @Activate
181 public void activate(ComponentContext cc) throws ConfigurationException {
182 logger.info("Activate job dispatcher");
183 db = dbSessionFactory.createSession(emf);
184 scheduledExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
185 scheduledExecutor.setRemoveOnCancelPolicy(true);
186 logger.info("Activated");
187 updated(cc.getProperties());
188 }
189
190
191 @Modified
192 public void modified(ComponentContext cc) throws ConfigurationException {
193 logger.debug("Modified in job dispatcher");
194 updated(cc.getProperties());
195 }
196
197
198
199
200
201
202 @SuppressWarnings("rawtypes")
203 public void updated(Dictionary properties) {
204
205 logger.info("Updating job dispatcher properties");
206
207 float dispatchInterval = DEFAULT_DISPATCH_INTERVAL;
208 String dispatchIntervalString = StringUtils.trimToNull((String) properties.get(OPT_DISPATCHINTERVAL));
209 if (StringUtils.isNotBlank(dispatchIntervalString)) {
210 try {
211 dispatchInterval = Float.parseFloat(dispatchIntervalString);
212 } catch (Exception e) {
213 logger.warn("Dispatch interval '{}' is malformed, setting to {}", dispatchIntervalString,
214 MIN_DISPATCH_INTERVAL);
215 dispatchInterval = MIN_DISPATCH_INTERVAL;
216 }
217 if (dispatchInterval == 0) {
218 logger.info("Dispatching disabled");
219 } else if (dispatchInterval < MIN_DISPATCH_INTERVAL) {
220 logger.warn("Dispatch interval {} seconds is too low, adjusting to {}", dispatchInterval,
221 MIN_DISPATCH_INTERVAL);
222 dispatchInterval = MIN_DISPATCH_INTERVAL;
223 } else {
224 logger.info("Dispatch interval set to {} seconds", dispatchInterval);
225 }
226 }
227
228
229 if (jdfuture != null) {
230 jdfuture.cancel(true);
231 }
232
233
234 if (dispatchInterval > 0) {
235 long dispatchIntervalMs = Math.round(dispatchInterval * DISPATCH_INTERVAL_MS_FACTOR);
236 logger.info("Job dispatching is enabled");
237 logger.debug("Starting job dispatching at a custom interval of {}s", dispatchInterval);
238 jdfuture = scheduledExecutor.scheduleWithFixedDelay(getJobDispatcherRunnable(), dispatchIntervalMs,
239 dispatchIntervalMs, TimeUnit.MILLISECONDS);
240 } else {
241 logger.info("Job dispatching is disabled");
242 }
243 }
244
245 Runnable getJobDispatcherRunnable() {
246 return new JobDispatcherRunner();
247 }
248
249 public class JobDispatcherRunner implements Runnable {
250
251
252
253
254
255
256 @Override
257 public void run() {
258 logger.debug("Starting job dispatch");
259
260 undispatchableJobTypes = new ArrayList<>();
261 try {
262
263
264 if (serviceRegistry.collectJobstats) {
265 serviceRegistry.updateStatisticsJobData();
266 }
267
268 if (!dispatchPriorityList.isEmpty()) {
269 logger.trace("Checking for outdated jobs in dispatchPriorityList's '{}' jobs", dispatchPriorityList.size());
270
271 List<Long> jobIds = db.exec(getDispatchableJobsWithIdFilterQuery(dispatchPriorityList.keySet()));
272 for (Long jobId : new HashSet<>(dispatchPriorityList.keySet())) {
273 if (!jobIds.contains(jobId)) {
274 logger.debug("Removing outdated dispatchPriorityList job '{}'", jobId);
275 dispatchPriorityList.remove(jobId);
276 }
277 }
278 }
279
280 int jobsOffset = 0;
281 List<JpaJob> dispatchableJobs;
282 List<JpaJob> workflowJobs = new ArrayList<>();
283 boolean jobsFound;
284 do {
285
286 dispatchableJobs = db.exec(serviceRegistry.getDispatchableJobsWithStatusQuery(
287 jobsOffset, ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT, Job.Status.RESTART
288 ));
289 jobsOffset += ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT;
290 jobsFound = !dispatchableJobs.isEmpty();
291
292
293 for (JpaJob job : dispatchableJobs) {
294 if (ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())) {
295 workflowJobs.add(job);
296 }
297 }
298 if (dispatchableJobs.removeAll(workflowJobs) && dispatchableJobs.isEmpty()) {
299 continue;
300 }
301
302 dispatchDispatchableJobs(dispatchableJobs);
303 } while (jobsFound);
304
305 jobsOffset = 0;
306 do {
307
308 dispatchableJobs = db.exec(serviceRegistry.getDispatchableJobsWithStatusQuery(
309 jobsOffset, ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT, Job.Status.QUEUED
310 ));
311 jobsOffset += ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT;
312 jobsFound = !dispatchableJobs.isEmpty();
313
314
315 for (JpaJob job : dispatchableJobs) {
316 if (ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())) {
317 workflowJobs.add(job);
318 }
319 }
320 if (dispatchableJobs.removeAll(workflowJobs) && dispatchableJobs.isEmpty()) {
321 continue;
322 }
323
324 dispatchDispatchableJobs(dispatchableJobs);
325 } while (jobsFound);
326
327 if (!workflowJobs.isEmpty()) {
328 dispatchDispatchableJobs(workflowJobs);
329 }
330 } catch (Throwable t) {
331 logger.warn("Error dispatching jobs", t);
332 } finally {
333 undispatchableJobTypes = null;
334 }
335
336 logger.debug("Finished job dispatch");
337 }
338
339
340
341
342
343
344 private void dispatchDispatchableJobs(List<JpaJob> jobsToDispatch) {
345
346 SystemLoad systemLoad = db.exec(serviceRegistry.getHostLoadsQuery());
347
348 for (JpaJob job : jobsToDispatch) {
349
350 String jobType = job.getJobType();
351
352
353 String jobSignature = jobType + '@' + job.getOperation();
354 if (undispatchableJobTypes.contains(jobSignature) && !dispatchPriorityList.containsKey(job.getId())) {
355 logger.trace("Skipping dispatching of {} with type '{}' for this round of dispatching", job, jobType);
356 continue;
357 }
358
359
360 String creator = job.getCreator();
361 String creatorOrganization = job.getOrganization();
362
363
364 Organization organization;
365 try {
366 organization = organizationDirectoryService.getOrganization(creatorOrganization);
367 securityService.setOrganization(organization);
368 } catch (NotFoundException e) {
369 logger.debug("Skipping dispatching of job for non-existing organization '{}'", creatorOrganization);
370 continue;
371 }
372
373
374 User user = userDirectoryService.loadUser(creator);
375 if (user == null) {
376 logger.warn("Unable to dispatch {}: creator '{}' is not available", job, creator);
377 continue;
378 }
379 securityService.setUser(user);
380
381
382 try {
383 List<ServiceRegistration> services = db.exec(serviceRegistry.getServiceRegistrationsQuery());
384 List<HostRegistration> hosts = db.exec(serviceRegistry.getHostRegistrationsQuery()).stream()
385 .filter(host -> !dispatchPriorityList.containsValue(host.getBaseUrl())
386 || host.getBaseUrl().equals(dispatchPriorityList.get(job.getId())))
387 .collect(Collectors.toList());
388 List<ServiceRegistration> candidateServices;
389
390
391
392
393 Job parentJob = null;
394 try {
395 if (job.getParentJob() != null) {
396 parentJob = serviceRegistry.getJob(job.getParentJob().getId());
397 }
398 } catch (NotFoundException e) {
399
400 }
401
402
403
404 boolean parentHasRunningChildren = false;
405 if (parentJob != null) {
406 for (Job child : serviceRegistry.getChildJobs(parentJob.getId())) {
407 if (Job.Status.RUNNING.equals(child.getStatus())) {
408 parentHasRunningChildren = true;
409 break;
410 }
411 }
412 }
413
414
415
416 if (parentJob == null || ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(jobType) || parentHasRunningChildren) {
417 logger.trace("Using available capacity only for dispatching of {} to a service of type '{}'", job, jobType);
418 candidateServices = serviceRegistry.getServiceRegistrationsWithCapacity(jobType, services, hosts,
419 systemLoad);
420 } else {
421 logger.trace("Using full list of services for dispatching of {} to a service of type '{}'", job, jobType);
422 candidateServices = serviceRegistry.getServiceRegistrationsByLoad(jobType, services, hosts, systemLoad);
423 }
424
425
426 String hostAcceptingJob;
427 try {
428 hostAcceptingJob = dispatchJob(job, candidateServices);
429 try {
430 systemLoad.updateNodeLoad(hostAcceptingJob, job.getJobLoad());
431 } catch (NotFoundException e) {
432 logger.info("Host {} not found in load list, cannot dispatch {} to it", hostAcceptingJob, job);
433 }
434
435 dispatchPriorityList.remove(job.getId());
436 } catch (ServiceUnavailableException e) {
437 logger.debug("Jobs of type {} currently cannot be dispatched", job.getOperation());
438
439 if (!ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(jobType)) {
440 undispatchableJobTypes.add(jobSignature);
441 }
442 continue;
443 } catch (UndispatchableJobException e) {
444 logger.debug("{} currently cannot be dispatched", job);
445 continue;
446 }
447
448 logger.debug("{} dispatched to {}", job, hostAcceptingJob);
449 } catch (ServiceRegistryException e) {
450 Throwable cause = (e.getCause() != null) ? e.getCause() : e;
451 logger.error("Error dispatching {}", job, cause);
452 } finally {
453 securityService.setUser(null);
454 securityService.setOrganization(null);
455 }
456 }
457 }
458
459
460
461
462
463
464
465
466
467
468
469
470
471 private String dispatchJob(JpaJob job, List<ServiceRegistration> services)
472 throws ServiceRegistryException, ServiceUnavailableException, UndispatchableJobException {
473 if (services.size() == 0) {
474 logger.debug("No service is currently available to handle jobs of type '" + job.getJobType() + "'");
475 throw new ServiceUnavailableException("No service of type " + job.getJobType() + " available");
476 }
477
478
479 job.setStatus(Job.Status.DISPATCHING);
480
481 boolean triedDispatching = false;
482 boolean jobLoadExceedsMaximumLoads = false;
483
484 final Float highestMaxLoad = services.stream()
485 .map(s -> ((ServiceRegistrationJpaImpl) s).getHostRegistration())
486 .map(HostRegistration::getMaxLoad)
487 .max(Comparator.naturalOrder())
488 .get();
489
490 if (job.getJobLoad() > highestMaxLoad) {
491
492
493 jobLoadExceedsMaximumLoads = true;
494 }
495
496 for (ServiceRegistration registration : services) {
497 job.setProcessorServiceRegistration((ServiceRegistrationJpaImpl) registration);
498
499
500
501
502 if (jobLoadExceedsMaximumLoads
503 && job.getProcessorServiceRegistration().getHostRegistration().getMaxLoad() != highestMaxLoad) {
504 continue;
505 }
506
507 try {
508 job = serviceRegistry.updateInternal(job);
509 } catch (Exception e) {
510
511
512
513 logger.debug("Unable to dispatch {}. This is likely caused by another service registry dispatching the job",
514 job);
515 throw new UndispatchableJobException(job + " is already being dispatched");
516 }
517
518 triedDispatching = true;
519
520 String serviceUrl = UrlSupport.concat(registration.getHost(), registration.getPath(), "dispatch");
521 HttpPost post = new HttpPost(serviceUrl);
522
523
524 post.addHeader(ORGANIZATION_HEADER, securityService.getOrganization().getId());
525 post.addHeader(USER_HEADER, securityService.getUser().getUsername());
526
527 List<BasicNameValuePair> params = new ArrayList<>();
528 params.add(new BasicNameValuePair("id", Long.toString(job.getId())));
529 params.add(new BasicNameValuePair("operation", job.getOperation()));
530 post.setEntity(new UrlEncodedFormEntity(params, UTF_8));
531
532
533 HttpResponse response = null;
534 int responseStatusCode;
535 try {
536 logger.debug("Trying to dispatch {} type '{}' load {} to {}", job, job.getJobType(), job.getJobLoad(),
537 registration.getHost());
538 if (!ServiceRegistryJpaImpl.START_WORKFLOW.equals(job.getOperation())) {
539 serviceRegistry.setCurrentJob(job.toJob());
540 }
541 response = client.execute(post);
542 responseStatusCode = response.getStatusLine().getStatusCode();
543 if (responseStatusCode == HttpStatus.SC_NO_CONTENT) {
544 return registration.getHost();
545 } else if (responseStatusCode == HttpStatus.SC_SERVICE_UNAVAILABLE) {
546 logger.debug("Service {} is currently refusing to accept jobs of type {}", registration,
547 job.getOperation());
548 continue;
549 } else if (responseStatusCode == HttpStatus.SC_PRECONDITION_FAILED) {
550 job.setStatus(Job.Status.FAILED);
551 job = serviceRegistry.updateJob(job);
552 logger.debug("Service {} refused to accept {}", registration, job);
553 throw new UndispatchableJobException(IOUtils.toString(response.getEntity().getContent()));
554 } else if (responseStatusCode == HttpStatus.SC_METHOD_NOT_ALLOWED) {
555 logger.debug("Service {} is not yet reachable", registration);
556 continue;
557 } else {
558 logger.warn("Service {} failed ({}) accepting {}", registration, responseStatusCode, job);
559 continue;
560 }
561 } catch (UndispatchableJobException e) {
562 throw e;
563 } catch (TrustedHttpClientException e) {
564
565 logger.warn("Unable to dispatch {}", job, e);
566 continue;
567 } catch (Exception e) {
568 logger.warn("Unable to dispatch {}", job, e);
569 } finally {
570 try {
571 client.close(response);
572 } catch (IOException e) {
573
574 }
575 serviceRegistry.setCurrentJob(null);
576 }
577 }
578
579
580 if (triedDispatching) {
581
582
583
584 if (serviceRegistry.acceptJobLoadsExeedingMaxLoad && !dispatchPriorityList.containsKey(job.getId())
585 && !ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())
586 && job.getProcessorServiceRegistration() != null) {
587 String host = job.getProcessorServiceRegistration().getHost();
588 logger.debug("About to add {} to dispatchPriorityList with processor host {}", job, host);
589 dispatchPriorityList.put(job.getId(), host);
590 }
591
592 try {
593 job.setStatus(Job.Status.QUEUED);
594 job.setProcessorServiceRegistration(null);
595 job = serviceRegistry.updateJob(job);
596 } catch (Exception e) {
597 logger.error("Unable to put {} back into queue", job, e);
598 }
599 }
600
601 logger.debug("Unable to dispatch {}, no service is currently ready to accept the job", job);
602 throw new UndispatchableJobException(job + " is currently undispatchable");
603 }
604
605
606
607
608
609
610
611 protected Function<EntityManager, List<Long>> getDispatchableJobsWithIdFilterQuery(Set<Long> jobIds) {
612 return em -> {
613 if (jobIds == null || jobIds.isEmpty()) {
614 return Collections.emptyList();
615 }
616
617 return namedQuery.findAll(
618 "Job.dispatchable.status.idfilter",
619 Long.class,
620 Pair.of("jobids", dispatchPriorityList.keySet()),
621 Pair.of("statuses", List.of(
622 Job.Status.RESTART.ordinal(),
623 Job.Status.QUEUED.ordinal()
624 ))
625 ).apply(em);
626 };
627 }
628
629 private final Function<ServiceRegistration, HostRegistration> toHostRegistration =
630 new Function<ServiceRegistration, HostRegistration>() {
631 @Override
632 public HostRegistration apply(ServiceRegistration s) {
633 return ((ServiceRegistrationJpaImpl) s).getHostRegistration();
634 }
635 };
636
637 private final Function<HostRegistration, Float> toMaxLoad = new Function<HostRegistration, Float>() {
638 @Override
639 public Float apply(HostRegistration h) {
640 return h.getMaxLoad();
641 }
642 };
643
644 private final Comparator<Float> sortFloatValuesDesc = new Comparator<Float>() {
645 @Override
646 public int compare(Float o1, Float o2) {
647 return o2.compareTo(o1);
648 }
649 };
650 }
651 }