1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.serviceregistry.impl;
23
24 import static java.nio.charset.StandardCharsets.UTF_8;
25 import static org.opencastproject.db.Queries.namedQuery;
26 import static org.opencastproject.security.api.SecurityConstants.ORGANIZATION_HEADER;
27 import static org.opencastproject.security.api.SecurityConstants.USER_HEADER;
28
29 import org.opencastproject.db.DBSession;
30 import org.opencastproject.db.DBSessionFactory;
31 import org.opencastproject.job.api.Job;
32 import org.opencastproject.job.jpa.JpaJob;
33 import org.opencastproject.security.api.Organization;
34 import org.opencastproject.security.api.OrganizationDirectoryService;
35 import org.opencastproject.security.api.SecurityService;
36 import org.opencastproject.security.api.TrustedHttpClient;
37 import org.opencastproject.security.api.TrustedHttpClientException;
38 import org.opencastproject.security.api.User;
39 import org.opencastproject.security.api.UserDirectoryService;
40 import org.opencastproject.serviceregistry.api.HostRegistration;
41 import org.opencastproject.serviceregistry.api.ServiceRegistration;
42 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
43 import org.opencastproject.serviceregistry.api.SystemLoad;
44 import org.opencastproject.serviceregistry.impl.jpa.ServiceRegistrationJpaImpl;
45 import org.opencastproject.util.NotFoundException;
46 import org.opencastproject.util.UrlSupport;
47
48 import org.apache.commons.io.IOUtils;
49 import org.apache.commons.lang3.StringUtils;
50 import org.apache.commons.lang3.tuple.Pair;
51 import org.apache.http.HttpResponse;
52 import org.apache.http.HttpStatus;
53 import org.apache.http.client.entity.UrlEncodedFormEntity;
54 import org.apache.http.client.methods.HttpPost;
55 import org.apache.http.message.BasicNameValuePair;
56 import org.osgi.service.cm.ConfigurationException;
57 import org.osgi.service.component.ComponentContext;
58 import org.osgi.service.component.annotations.Activate;
59 import org.osgi.service.component.annotations.Component;
60 import org.osgi.service.component.annotations.Modified;
61 import org.osgi.service.component.annotations.Reference;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 import java.io.IOException;
66 import java.util.ArrayList;
67 import java.util.Collections;
68 import java.util.Comparator;
69 import java.util.Dictionary;
70 import java.util.HashMap;
71 import java.util.HashSet;
72 import java.util.List;
73 import java.util.Map;
74 import java.util.Set;
75 import java.util.concurrent.Executors;
76 import java.util.concurrent.ScheduledFuture;
77 import java.util.concurrent.ScheduledThreadPoolExecutor;
78 import java.util.concurrent.TimeUnit;
79 import java.util.function.Function;
80 import java.util.stream.Collectors;
81
82 import javax.persistence.EntityManager;
83 import javax.persistence.EntityManagerFactory;
84
85
86
87
88
89 @Component(
90 property = {
91 "service.description=Job Dispatcher"
92 },
93 immediate = true,
94 service = { JobDispatcher.class }
95 )
96 public class JobDispatcher {
97
98
99 public static final String PERSISTENCE_UNIT = "org.opencastproject.common";
100
101
102 protected static final String OPT_DISPATCHINTERVAL = "dispatch.interval";
103
104
105 static final float MIN_DISPATCH_INTERVAL = 1.0F;
106
107
108 static final float DEFAULT_DISPATCH_INTERVAL = 0.0F;
109
110
111 static final long DISPATCH_INTERVAL_MS_FACTOR = 1000;
112
113 private static final Logger logger = LoggerFactory.getLogger(JobDispatcher.class);
114
115 private ServiceRegistryJpaImpl serviceRegistry;
116
117 private OrganizationDirectoryService organizationDirectoryService;
118 private UserDirectoryService userDirectoryService;
119 private SecurityService securityService;
120 private TrustedHttpClient client;
121
122
123 protected ScheduledThreadPoolExecutor scheduledExecutor = null;
124
125
126 private EntityManagerFactory emf = null;
127
128 protected DBSessionFactory dbSessionFactory;
129
130 protected DBSession db;
131
132 private ScheduledFuture jdfuture = null;
133
134
135
136
137 private List<String> undispatchableJobTypes = null;
138
139
140 protected final Map<Long, String> dispatchPriorityList = new HashMap<>();
141
142
143 @Reference(target = "(osgi.unit.name=org.opencastproject.common)")
144 void setEntityManagerFactory(EntityManagerFactory emf) {
145 this.emf = emf;
146 }
147
148
149 @Reference
150 public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
151 this.dbSessionFactory = dbSessionFactory;
152 }
153
154
155 @Reference()
156 void setServiceRegistry(ServiceRegistryJpaImpl sr) {
157 this.serviceRegistry = sr;
158 }
159
160 @Reference()
161 void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
162 this.organizationDirectoryService = organizationDirectoryService;
163 }
164
165 @Reference
166 void setUserDirectoryService(UserDirectoryService svc) {
167 this.userDirectoryService = svc;
168 }
169
170 @Reference
171 void setSecurityService(SecurityService sec) {
172 this.securityService = sec;
173 }
174
175 @Reference
176 void setTrustedHttpClient(TrustedHttpClient client) {
177 this.client = client;
178 }
179
180 @Activate
181 public void activate(ComponentContext cc) throws ConfigurationException {
182 logger.info("Activate job dispatcher");
183 db = dbSessionFactory.createSession(emf);
184 scheduledExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
185 scheduledExecutor.setRemoveOnCancelPolicy(true);
186 logger.info("Activated");
187 updated(cc.getProperties());
188 }
189
190
191 @Modified
192 public void modified(ComponentContext cc) throws ConfigurationException {
193 logger.debug("Modified in job dispatcher");
194 updated(cc.getProperties());
195 }
196
197
198
199
200
201
202 @SuppressWarnings("rawtypes")
203 public void updated(Dictionary properties) {
204
205 logger.info("Updating job dispatcher properties");
206
207 float dispatchInterval = DEFAULT_DISPATCH_INTERVAL;
208 String dispatchIntervalString = StringUtils.trimToNull((String) properties.get(OPT_DISPATCHINTERVAL));
209 if (StringUtils.isNotBlank(dispatchIntervalString)) {
210 try {
211 dispatchInterval = Float.parseFloat(dispatchIntervalString);
212 } catch (Exception e) {
213 logger.warn("Dispatch interval '{}' is malformed, setting to {}", dispatchIntervalString, MIN_DISPATCH_INTERVAL);
214 dispatchInterval = MIN_DISPATCH_INTERVAL;
215 }
216 if (dispatchInterval == 0) {
217 logger.info("Dispatching disabled");
218 } else if (dispatchInterval < MIN_DISPATCH_INTERVAL) {
219 logger.warn("Dispatch interval {} seconds is too low, adjusting to {}", dispatchInterval, MIN_DISPATCH_INTERVAL);
220 dispatchInterval = MIN_DISPATCH_INTERVAL;
221 } else {
222 logger.info("Dispatch interval set to {} seconds", dispatchInterval);
223 }
224 }
225
226
227 if (jdfuture != null) {
228 jdfuture.cancel(true);
229 }
230
231
232 if (dispatchInterval > 0) {
233 long dispatchIntervalMs = Math.round(dispatchInterval * DISPATCH_INTERVAL_MS_FACTOR);
234 logger.info("Job dispatching is enabled");
235 logger.debug("Starting job dispatching at a custom interval of {}s", dispatchInterval);
236 jdfuture = scheduledExecutor.scheduleWithFixedDelay(getJobDispatcherRunnable(), dispatchIntervalMs, dispatchIntervalMs,
237 TimeUnit.MILLISECONDS);
238 } else {
239 logger.info("Job dispatching is disabled");
240 }
241 }
242
243 Runnable getJobDispatcherRunnable() {
244 return new JobDispatcherRunner();
245 }
246
247 public class JobDispatcherRunner implements Runnable {
248
249
250
251
252
253
254 @Override
255 public void run() {
256 logger.debug("Starting job dispatch");
257
258 undispatchableJobTypes = new ArrayList<>();
259 try {
260
261
262 if (serviceRegistry.collectJobstats) {
263 serviceRegistry.updateStatisticsJobData();
264 }
265
266 if (!dispatchPriorityList.isEmpty()) {
267 logger.trace("Checking for outdated jobs in dispatchPriorityList's '{}' jobs", dispatchPriorityList.size());
268
269 List<Long> jobIds = db.exec(getDispatchableJobsWithIdFilterQuery(dispatchPriorityList.keySet()));
270 for (Long jobId : new HashSet<>(dispatchPriorityList.keySet())) {
271 if (!jobIds.contains(jobId)) {
272 logger.debug("Removing outdated dispatchPriorityList job '{}'", jobId);
273 dispatchPriorityList.remove(jobId);
274 }
275 }
276 }
277
278 int jobsOffset = 0;
279 List<JpaJob> dispatchableJobs;
280 List<JpaJob> workflowJobs = new ArrayList<>();
281 boolean jobsFound;
282 do {
283
284 dispatchableJobs = db.exec(serviceRegistry.getDispatchableJobsWithStatusQuery(
285 jobsOffset, ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT, Job.Status.RESTART
286 ));
287 jobsOffset += ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT;
288 jobsFound = !dispatchableJobs.isEmpty();
289
290
291 for (JpaJob job : dispatchableJobs) {
292 if (ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())) {
293 workflowJobs.add(job);
294 }
295 }
296 if (dispatchableJobs.removeAll(workflowJobs) && dispatchableJobs.isEmpty()) {
297 continue;
298 }
299
300 dispatchDispatchableJobs(dispatchableJobs);
301 } while (jobsFound);
302
303 jobsOffset = 0;
304 do {
305
306 dispatchableJobs = db.exec(serviceRegistry.getDispatchableJobsWithStatusQuery(
307 jobsOffset, ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT, Job.Status.QUEUED
308 ));
309 jobsOffset += ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT;
310 jobsFound = !dispatchableJobs.isEmpty();
311
312
313 for (JpaJob job : dispatchableJobs) {
314 if (ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())) {
315 workflowJobs.add(job);
316 }
317 }
318 if (dispatchableJobs.removeAll(workflowJobs) && dispatchableJobs.isEmpty()) {
319 continue;
320 }
321
322 dispatchDispatchableJobs(dispatchableJobs);
323 } while (jobsFound);
324
325 if (!workflowJobs.isEmpty()) {
326 dispatchDispatchableJobs(workflowJobs);
327 }
328 } catch (Throwable t) {
329 logger.warn("Error dispatching jobs", t);
330 } finally {
331 undispatchableJobTypes = null;
332 }
333
334 logger.debug("Finished job dispatch");
335 }
336
337
338
339
340
341
342 private void dispatchDispatchableJobs(List<JpaJob> jobsToDispatch) {
343
344 SystemLoad systemLoad = db.exec(serviceRegistry.getHostLoadsQuery());
345
346 for (JpaJob job : jobsToDispatch) {
347
348 String jobType = job.getJobType();
349
350
351 String jobSignature = jobType + '@' + job.getOperation();
352 if (undispatchableJobTypes.contains(jobSignature) && !dispatchPriorityList.containsKey(job.getId())) {
353 logger.trace("Skipping dispatching of {} with type '{}' for this round of dispatching", job, jobType);
354 continue;
355 }
356
357
358 String creator = job.getCreator();
359 String creatorOrganization = job.getOrganization();
360
361
362 Organization organization;
363 try {
364 organization = organizationDirectoryService.getOrganization(creatorOrganization);
365 securityService.setOrganization(organization);
366 } catch (NotFoundException e) {
367 logger.debug("Skipping dispatching of job for non-existing organization '{}'", creatorOrganization);
368 continue;
369 }
370
371
372 User user = userDirectoryService.loadUser(creator);
373 if (user == null) {
374 logger.warn("Unable to dispatch {}: creator '{}' is not available", job, creator);
375 continue;
376 }
377 securityService.setUser(user);
378
379
380 try {
381 List<ServiceRegistration> services = db.exec(serviceRegistry.getServiceRegistrationsQuery());
382 List<HostRegistration> hosts = db.exec(serviceRegistry.getHostRegistrationsQuery()).stream()
383 .filter(host -> !dispatchPriorityList.containsValue(host.getBaseUrl())
384 || host.getBaseUrl().equals(dispatchPriorityList.get(job.getId())))
385 .collect(Collectors.toList());
386 List<ServiceRegistration> candidateServices;
387
388
389
390
391 Job parentJob = null;
392 try {
393 if (job.getParentJob() != null) {
394 parentJob = serviceRegistry.getJob(job.getParentJob().getId());
395 }
396 } catch (NotFoundException e) {
397
398 }
399
400
401
402 boolean parentHasRunningChildren = false;
403 if (parentJob != null) {
404 for (Job child : serviceRegistry.getChildJobs(parentJob.getId())) {
405 if (Job.Status.RUNNING.equals(child.getStatus())) {
406 parentHasRunningChildren = true;
407 break;
408 }
409 }
410 }
411
412
413
414 if (parentJob == null || ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(jobType) || parentHasRunningChildren) {
415 logger.trace("Using available capacity only for dispatching of {} to a service of type '{}'", job, jobType);
416 candidateServices = serviceRegistry.getServiceRegistrationsWithCapacity(jobType, services, hosts, systemLoad);
417 } else {
418 logger.trace("Using full list of services for dispatching of {} to a service of type '{}'", job, jobType);
419 candidateServices = serviceRegistry.getServiceRegistrationsByLoad(jobType, services, hosts, systemLoad);
420 }
421
422
423 String hostAcceptingJob;
424 try {
425 hostAcceptingJob = dispatchJob(job, candidateServices);
426 try {
427 systemLoad.updateNodeLoad(hostAcceptingJob, job.getJobLoad());
428 } catch (NotFoundException e) {
429 logger.info("Host {} not found in load list, cannot dispatch {} to it", hostAcceptingJob, job);
430 }
431
432 dispatchPriorityList.remove(job.getId());
433 } catch (ServiceUnavailableException e) {
434 logger.debug("Jobs of type {} currently cannot be dispatched", job.getOperation());
435
436 if (!ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(jobType)) {
437 undispatchableJobTypes.add(jobSignature);
438 }
439 continue;
440 } catch (UndispatchableJobException e) {
441 logger.debug("{} currently cannot be dispatched", job);
442 continue;
443 }
444
445 logger.debug("{} dispatched to {}", job, hostAcceptingJob);
446 } catch (ServiceRegistryException e) {
447 Throwable cause = (e.getCause() != null) ? e.getCause() : e;
448 logger.error("Error dispatching {}", job, cause);
449 } finally {
450 securityService.setUser(null);
451 securityService.setOrganization(null);
452 }
453 }
454 }
455
456
457
458
459
460
461
462
463
464
465
466
467 private String dispatchJob(JpaJob job, List<ServiceRegistration> services)
468 throws ServiceRegistryException, ServiceUnavailableException, UndispatchableJobException {
469 if (services.size() == 0) {
470 logger.debug("No service is currently available to handle jobs of type '" + job.getJobType() + "'");
471 throw new ServiceUnavailableException("No service of type " + job.getJobType() + " available");
472 }
473
474
475 job.setStatus(Job.Status.DISPATCHING);
476
477 boolean triedDispatching = false;
478 boolean jobLoadExceedsMaximumLoads = false;
479
480 final Float highestMaxLoad = services.stream()
481 .map(s -> ((ServiceRegistrationJpaImpl) s).getHostRegistration())
482 .map(HostRegistration::getMaxLoad)
483 .max(Comparator.naturalOrder())
484 .get();
485
486 if (job.getJobLoad() > highestMaxLoad) {
487
488 jobLoadExceedsMaximumLoads = true;
489 }
490
491 for (ServiceRegistration registration : services) {
492 job.setProcessorServiceRegistration((ServiceRegistrationJpaImpl) registration);
493
494
495
496 if (jobLoadExceedsMaximumLoads && job.getProcessorServiceRegistration().getHostRegistration().getMaxLoad() != highestMaxLoad) {
497 continue;
498 }
499
500 try {
501 job = serviceRegistry.updateInternal(job);
502 } catch (Exception e) {
503
504
505
506 logger.debug("Unable to dispatch {}. This is likely caused by another service registry dispatching the job",
507 job);
508 throw new UndispatchableJobException(job + " is already being dispatched");
509 }
510
511 triedDispatching = true;
512
513 String serviceUrl = UrlSupport.concat(registration.getHost(), registration.getPath(), "dispatch");
514 HttpPost post = new HttpPost(serviceUrl);
515
516
517 post.addHeader(ORGANIZATION_HEADER, securityService.getOrganization().getId());
518 post.addHeader(USER_HEADER, securityService.getUser().getUsername());
519
520 List<BasicNameValuePair> params = new ArrayList<>();
521 params.add(new BasicNameValuePair("id", Long.toString(job.getId())));
522 params.add(new BasicNameValuePair("operation", job.getOperation()));
523 post.setEntity(new UrlEncodedFormEntity(params, UTF_8));
524
525
526 HttpResponse response = null;
527 int responseStatusCode;
528 try {
529 logger.debug("Trying to dispatch {} type '{}' load {} to {}", job, job.getJobType(), job.getJobLoad(),
530 registration.getHost());
531 if (!ServiceRegistryJpaImpl.START_WORKFLOW.equals(job.getOperation())) {
532 serviceRegistry.setCurrentJob(job.toJob());
533 }
534 response = client.execute(post);
535 responseStatusCode = response.getStatusLine().getStatusCode();
536 if (responseStatusCode == HttpStatus.SC_NO_CONTENT) {
537 return registration.getHost();
538 } else if (responseStatusCode == HttpStatus.SC_SERVICE_UNAVAILABLE) {
539 logger.debug("Service {} is currently refusing to accept jobs of type {}", registration, job.getOperation());
540 continue;
541 } else if (responseStatusCode == HttpStatus.SC_PRECONDITION_FAILED) {
542 job.setStatus(Job.Status.FAILED);
543 job = serviceRegistry.updateJob(job);
544 logger.debug("Service {} refused to accept {}", registration, job);
545 throw new UndispatchableJobException(IOUtils.toString(response.getEntity().getContent()));
546 } else if (responseStatusCode == HttpStatus.SC_METHOD_NOT_ALLOWED) {
547 logger.debug("Service {} is not yet reachable", registration);
548 continue;
549 } else {
550 logger.warn("Service {} failed ({}) accepting {}", registration, responseStatusCode, job);
551 continue;
552 }
553 } catch (UndispatchableJobException e) {
554 throw e;
555 } catch (TrustedHttpClientException e) {
556
557 logger.warn("Unable to dispatch {}", job, e);
558 continue;
559 } catch (Exception e) {
560 logger.warn("Unable to dispatch {}", job, e);
561 } finally {
562 try {
563 client.close(response);
564 } catch (IOException e) {
565
566 }
567 serviceRegistry.setCurrentJob(null);
568 }
569 }
570
571
572 if (triedDispatching) {
573
574
575
576 if (serviceRegistry.acceptJobLoadsExeedingMaxLoad && !dispatchPriorityList.containsKey(job.getId()) && !ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())
577 && job.getProcessorServiceRegistration() != null) {
578 String host = job.getProcessorServiceRegistration().getHost();
579 logger.debug("About to add {} to dispatchPriorityList with processor host {}", job, host);
580 dispatchPriorityList.put(job.getId(), host);
581 }
582
583 try {
584 job.setStatus(Job.Status.QUEUED);
585 job.setProcessorServiceRegistration(null);
586 job = serviceRegistry.updateJob(job);
587 } catch (Exception e) {
588 logger.error("Unable to put {} back into queue", job, e);
589 }
590 }
591
592 logger.debug("Unable to dispatch {}, no service is currently ready to accept the job", job);
593 throw new UndispatchableJobException(job + " is currently undispatchable");
594 }
595
596
597
598
599
600
601
602 protected Function<EntityManager, List<Long>> getDispatchableJobsWithIdFilterQuery(Set<Long> jobIds) {
603 return em -> {
604 if (jobIds == null || jobIds.isEmpty()) {
605 return Collections.emptyList();
606 }
607
608 return namedQuery.findAll(
609 "Job.dispatchable.status.idfilter",
610 Long.class,
611 Pair.of("jobids", dispatchPriorityList.keySet()),
612 Pair.of("statuses", List.of(
613 Job.Status.RESTART.ordinal(),
614 Job.Status.QUEUED.ordinal()
615 ))
616 ).apply(em);
617 };
618 }
619
620 private final Function<ServiceRegistration, HostRegistration> toHostRegistration = new Function<ServiceRegistration, HostRegistration>() {
621 @Override
622 public HostRegistration apply(ServiceRegistration s) {
623 return ((ServiceRegistrationJpaImpl) s).getHostRegistration();
624 }
625 };
626
627 private final Function<HostRegistration, Float> toMaxLoad = new Function<HostRegistration, Float>() {
628 @Override
629 public Float apply(HostRegistration h) {
630 return h.getMaxLoad();
631 }
632 };
633
634 private final Comparator<Float> sortFloatValuesDesc = new Comparator<Float>() {
635 @Override
636 public int compare(Float o1, Float o2) {
637 return o2.compareTo(o1);
638 }
639 };
640 }
641 }