View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.serviceregistry.impl;
23  
24  import static java.nio.charset.StandardCharsets.UTF_8;
25  import static org.opencastproject.db.Queries.namedQuery;
26  import static org.opencastproject.security.api.SecurityConstants.ORGANIZATION_HEADER;
27  import static org.opencastproject.security.api.SecurityConstants.USER_HEADER;
28  
29  import org.opencastproject.db.DBSession;
30  import org.opencastproject.db.DBSessionFactory;
31  import org.opencastproject.job.api.Job;
32  import org.opencastproject.job.jpa.JpaJob;
33  import org.opencastproject.security.api.Organization;
34  import org.opencastproject.security.api.OrganizationDirectoryService;
35  import org.opencastproject.security.api.SecurityService;
36  import org.opencastproject.security.api.TrustedHttpClient;
37  import org.opencastproject.security.api.TrustedHttpClientException;
38  import org.opencastproject.security.api.User;
39  import org.opencastproject.security.api.UserDirectoryService;
40  import org.opencastproject.serviceregistry.api.HostRegistration;
41  import org.opencastproject.serviceregistry.api.ServiceRegistration;
42  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
43  import org.opencastproject.serviceregistry.api.SystemLoad;
44  import org.opencastproject.serviceregistry.impl.jpa.ServiceRegistrationJpaImpl;
45  import org.opencastproject.util.NotFoundException;
46  import org.opencastproject.util.UrlSupport;
47  
48  import org.apache.commons.io.IOUtils;
49  import org.apache.commons.lang3.StringUtils;
50  import org.apache.commons.lang3.tuple.Pair;
51  import org.apache.http.HttpResponse;
52  import org.apache.http.HttpStatus;
53  import org.apache.http.client.entity.UrlEncodedFormEntity;
54  import org.apache.http.client.methods.HttpPost;
55  import org.apache.http.message.BasicNameValuePair;
56  import org.osgi.service.cm.ConfigurationException;
57  import org.osgi.service.component.ComponentContext;
58  import org.osgi.service.component.annotations.Activate;
59  import org.osgi.service.component.annotations.Component;
60  import org.osgi.service.component.annotations.Deactivate;
61  import org.osgi.service.component.annotations.Modified;
62  import org.osgi.service.component.annotations.Reference;
63  import org.slf4j.Logger;
64  import org.slf4j.LoggerFactory;
65  
66  import java.io.IOException;
67  import java.util.ArrayList;
68  import java.util.Collections;
69  import java.util.Comparator;
70  import java.util.Dictionary;
71  import java.util.HashMap;
72  import java.util.HashSet;
73  import java.util.List;
74  import java.util.Map;
75  import java.util.Set;
76  import java.util.concurrent.Executors;
77  import java.util.concurrent.ScheduledFuture;
78  import java.util.concurrent.ScheduledThreadPoolExecutor;
79  import java.util.concurrent.TimeUnit;
80  import java.util.function.Function;
81  import java.util.stream.Collectors;
82  
83  import javax.persistence.EntityManager;
84  import javax.persistence.EntityManagerFactory;
85  
86  /**
87   * This dispatcher implementation will check for jobs in the QUEUED {@link Job.Status}. If
88   * new jobs are found, the dispatcher will attempt to dispatch each job to the least loaded service.
89   */
90  @Component(
91      property = {
92          "service.description=Job Dispatcher"
93      },
94      immediate = true,
95      service = { JobDispatcher.class }
96  )
97  public class JobDispatcher {
98  
99    /** JPA persistence unit name */
100   public static final String PERSISTENCE_UNIT = "org.opencastproject.common";
101 
102   /** Configuration key for the dispatch interval, in seconds */
103   protected static final String OPT_DISPATCHINTERVAL = "dispatch.interval";
104 
105   /** Minimum delay between job dispatching attempts, in seconds */
106   static final float MIN_DISPATCH_INTERVAL = 1.0F;
107 
108   /** Default delay between job dispatching attempts, in seconds */
109   static final float DEFAULT_DISPATCH_INTERVAL = 0.0F;
110 
111   /** Multiplicative factor to transform dispatch interval captured in seconds to milliseconds */
112   static final long DISPATCH_INTERVAL_MS_FACTOR = 1000;
113 
114   /** Default delay between checking if hosts are still alive in seconds * */
115   static final long DEFAULT_HEART_BEAT = 60;
116 
117   private static final Logger logger = LoggerFactory.getLogger(JobDispatcher.class);
118 
119   private ServiceRegistryJpaImpl serviceRegistry;
120 
121   private OrganizationDirectoryService organizationDirectoryService;
122   private UserDirectoryService userDirectoryService;
123   private SecurityService securityService;
124   private TrustedHttpClient client;
125 
126   /** The thread pool to use for dispatching. */
127   protected ScheduledThreadPoolExecutor scheduledExecutor = null;
128 
129   /** The factory used to generate the entity manager */
130   private EntityManagerFactory emf = null;
131 
132   protected DBSessionFactory dbSessionFactory;
133 
134   protected DBSession db;
135 
136   private ScheduledFuture jdfuture = null;
137 
138   /**
139    * A list with job types that cannot be dispatched in each interation
140    */
141   private List<String> undispatchableJobTypes = null;
142 
143   /** The dispatcher priority list */
144   protected final Map<Long, String> dispatchPriorityList = new HashMap<>();
145 
146   /** OSGi DI */
147   @Reference(target = "(osgi.unit.name=org.opencastproject.common)")
148   void setEntityManagerFactory(EntityManagerFactory emf) {
149     this.emf = emf;
150   }
151 
152   /** OSGi DI */
153   @Reference
154   public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
155     this.dbSessionFactory = dbSessionFactory;
156   }
157 
158   /** OSGi DI */
159   @Reference()
160   void setServiceRegistry(ServiceRegistryJpaImpl sr) {
161     this.serviceRegistry = sr;
162   }
163 
164   @Reference()
165   void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
166     this.organizationDirectoryService = organizationDirectoryService;
167   }
168 
169   @Reference
170   void setUserDirectoryService(UserDirectoryService svc) {
171     this.userDirectoryService = svc;
172   }
173 
174   @Reference
175   void setSecurityService(SecurityService sec) {
176     this.securityService = sec;
177   }
178 
179   @Reference
180   void setTrustedHttpClient(TrustedHttpClient client) {
181     this.client = client;
182   }
183 
184   @Activate
185   public void activate(ComponentContext cc) throws ConfigurationException  {
186     logger.info("Activate job dispatcher");
187     db = dbSessionFactory.createSession(emf);
188     scheduledExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
189     scheduledExecutor.setRemoveOnCancelPolicy(true);
190     logger.info("Activated");
191     updated(cc.getProperties());
192   }
193 
194 
195   @Modified
196   public void modified(ComponentContext cc) throws ConfigurationException {
197     logger.debug("Modified in job dispatcher");
198     updated(cc.getProperties());
199   }
200 
201   /**
202    * {@inheritDoc}
203    *
204    * @see org.osgi.service.cm.ManagedService#updated(java.util.Dictionary)
205    */
206   @SuppressWarnings("rawtypes")
207   public void updated(Dictionary properties) {
208 
209     logger.info("Updating job dispatcher properties");
210 
211     float dispatchInterval = DEFAULT_DISPATCH_INTERVAL;
212     String dispatchIntervalString = StringUtils.trimToNull((String) properties.get(OPT_DISPATCHINTERVAL));
213     if (StringUtils.isNotBlank(dispatchIntervalString)) {
214       try {
215         dispatchInterval = Float.parseFloat(dispatchIntervalString);
216       } catch (Exception e) {
217         logger.warn("Dispatch interval '{}' is malformed, setting to {}", dispatchIntervalString,
218             MIN_DISPATCH_INTERVAL);
219         dispatchInterval = MIN_DISPATCH_INTERVAL;
220       }
221       if (dispatchInterval == 0) {
222         logger.info("Dispatching disabled");
223       } else if (dispatchInterval < MIN_DISPATCH_INTERVAL) {
224         logger.warn("Dispatch interval {} seconds is too low, adjusting to {}", dispatchInterval,
225             MIN_DISPATCH_INTERVAL);
226         dispatchInterval = MIN_DISPATCH_INTERVAL;
227       } else {
228         logger.info("Dispatch interval set to {} seconds", dispatchInterval);
229       }
230     }
231 
232     // Stop the current dispatch thread so we can configure a new one
233     if (jdfuture != null) {
234       jdfuture.cancel(true);
235     }
236 
237     // Schedule the job dispatching.
238     if (dispatchInterval > 0) {
239       long dispatchIntervalMs = Math.round(dispatchInterval * DISPATCH_INTERVAL_MS_FACTOR);
240       logger.info("Job dispatching is enabled");
241       logger.debug("Starting job dispatching at a custom interval of {}s", dispatchInterval);
242       jdfuture = scheduledExecutor.scheduleWithFixedDelay(getJobDispatcherRunnable(), dispatchIntervalMs,
243           dispatchIntervalMs, TimeUnit.MILLISECONDS);
244       // Schedule heartbeat for dispatching nodes
245       serviceRegistry.startHeartbeat(DEFAULT_HEART_BEAT);
246     } else {
247       logger.info("Job dispatching is disabled");
248     }
249   }
250 
251   @Deactivate
252   public void deactivate() {
253     logger.info("Deactivate Job Dispatcher");
254 
255     // Wait for runnable to stop before stopping
256     if (scheduledExecutor != null) {
257       try {
258         scheduledExecutor.shutdownNow();
259         if (!scheduledExecutor.isShutdown()) {
260           logger.info("Waiting for Job Dispatcher to terminate");
261           scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS);
262         }
263       } catch (InterruptedException e) {
264         logger.error("Error shutting down the Job Dispatcher", e);
265       }
266     }
267   }
268 
269   Runnable getJobDispatcherRunnable() {
270     return new JobDispatcherRunner();
271   }
272 
273   public class JobDispatcherRunner implements Runnable {
274 
275     /**
276      * {@inheritDoc}
277      *
278      * @see Thread#run()
279      */
280     @Override
281     public void run() {
282       logger.debug("Starting job dispatch");
283 
284       undispatchableJobTypes = new ArrayList<>();
285       try {
286         if (!dispatchPriorityList.isEmpty()) {
287           logger.trace("Checking for outdated jobs in dispatchPriorityList's '{}' jobs", dispatchPriorityList.size());
288           // Remove outdated jobs from priority list
289           List<Long> jobIds = db.exec(getDispatchableJobsWithIdFilterQuery(dispatchPriorityList.keySet()));
290           for (Long jobId : new HashSet<>(dispatchPriorityList.keySet())) {
291             if (!jobIds.contains(jobId)) {
292               logger.debug("Removing outdated dispatchPriorityList job '{}'", jobId);
293               dispatchPriorityList.remove(jobId);
294             }
295           }
296         }
297 
298         int jobsOffset = 0;
299         List<JpaJob> dispatchableJobs;
300         List<JpaJob> workflowJobs = new ArrayList<>();
301         boolean jobsFound;
302         do {
303           // dispatch all dispatchable jobs with status restarted
304           dispatchableJobs = db.exec(serviceRegistry.getDispatchableJobsWithStatusQuery(
305               jobsOffset, ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT, Job.Status.RESTART
306           ));
307           jobsOffset += ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT;
308           jobsFound = !dispatchableJobs.isEmpty();
309 
310           // skip all jobs of type workflow, we will handle them next
311           for (JpaJob job : dispatchableJobs) {
312             if (ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())) {
313               workflowJobs.add(job);
314             }
315           }
316           if (dispatchableJobs.removeAll(workflowJobs) && dispatchableJobs.isEmpty()) {
317             continue;
318           }
319 
320           dispatchDispatchableJobs(dispatchableJobs);
321         } while (jobsFound);
322 
323         jobsOffset = 0;
324         do {
325           // dispatch all dispatchable jobs with status queued
326           dispatchableJobs = db.exec(serviceRegistry.getDispatchableJobsWithStatusQuery(
327               jobsOffset, ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT, Job.Status.QUEUED
328           ));
329           jobsOffset += ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT;
330           jobsFound = !dispatchableJobs.isEmpty();
331 
332           // skip all jobs of type workflow, we will handle them next
333           for (JpaJob job : dispatchableJobs) {
334             if (ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())) {
335               workflowJobs.add(job);
336             }
337           }
338           if (dispatchableJobs.removeAll(workflowJobs) && dispatchableJobs.isEmpty()) {
339             continue;
340           }
341 
342           dispatchDispatchableJobs(dispatchableJobs);
343         } while (jobsFound);
344 
345         if (!workflowJobs.isEmpty()) {
346           dispatchDispatchableJobs(workflowJobs);
347         }
348       } catch (Throwable t) {
349         logger.warn("Error dispatching jobs", t);
350       } finally {
351         undispatchableJobTypes = null;
352       }
353 
354       logger.debug("Finished job dispatch");
355     }
356 
357     /**
358      * Dispatch the given jobs.
359      *
360      * @param jobsToDispatch list with dispatchable jobs to dispatch
361      */
362     private void dispatchDispatchableJobs(List<JpaJob> jobsToDispatch) {
363       // Get the current system load
364       SystemLoad systemLoad = db.exec(serviceRegistry.getHostLoadsQuery());
365 
366       for (JpaJob job : jobsToDispatch) {
367         // Remember the job type
368         String jobType = job.getJobType();
369 
370         // Skip jobs that we already know can't be dispatched except of jobs in the priority list
371         String jobSignature = jobType + '@' + job.getOperation();
372         if (undispatchableJobTypes.contains(jobSignature) && !dispatchPriorityList.containsKey(job.getId())) {
373           logger.trace("Skipping dispatching of {} with type '{}' for this round of dispatching", job, jobType);
374           continue;
375         }
376 
377         // Set the job's user and organization prior to dispatching
378         String creator = job.getCreator();
379         String creatorOrganization = job.getOrganization();
380 
381         // Try to load the organization.
382         Organization organization;
383         try {
384           organization = organizationDirectoryService.getOrganization(creatorOrganization);
385           securityService.setOrganization(organization);
386         } catch (NotFoundException e) {
387           logger.debug("Skipping dispatching of job for non-existing organization '{}'", creatorOrganization);
388           continue;
389         }
390 
391         // Try to load the user
392         User user = userDirectoryService.loadUser(creator);
393         if (user == null) {
394           logger.warn("Unable to dispatch {}: creator '{}' is not available", job, creator);
395           continue;
396         }
397         securityService.setUser(user);
398 
399         // Start dispatching
400         try {
401           List<ServiceRegistration> services = db.exec(serviceRegistry.getServiceRegistrationsQuery());
402           List<HostRegistration> hosts = db.exec(serviceRegistry.getHostRegistrationsQuery()).stream()
403                                            .filter(host -> !dispatchPriorityList.containsValue(host.getBaseUrl())
404                                                || host.getBaseUrl().equals(dispatchPriorityList.get(job.getId())))
405                                            .collect(Collectors.toList());
406           List<ServiceRegistration> candidateServices;
407 
408           // Depending on whether this running job is trying to reach out to other services or whether this is an
409           // attempt to execute the next operation in a workflow, choose either from a limited or from the full list
410           // of services
411           Job parentJob = null;
412           try {
413             if (job.getParentJob() != null) {
414               parentJob = serviceRegistry.getJob(job.getParentJob().getId());
415             }
416           } catch (NotFoundException e) {
417             // That's ok
418           }
419 
420           // When a job A starts a series of child jobs, then those child jobs should only be dispatched at the
421           // same time if there is processing capacity available.
422           boolean parentHasRunningChildren = false;
423           if (parentJob != null) {
424             for (Job child : serviceRegistry.getChildJobs(parentJob.getId())) {
425               if (Job.Status.RUNNING.equals(child.getStatus())) {
426                 parentHasRunningChildren = true;
427                 break;
428               }
429             }
430           }
431 
432           // If this is a root job (a new workflow or a new workflow operation), then only dispatch if there is
433           // capacity, i. e. the workflow service is ok dispatching the next workflow or the next workflow operation.
434           if (parentJob == null || ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(jobType) || parentHasRunningChildren) {
435             logger.trace("Using available capacity only for dispatching of {} to a service of type '{}'", job, jobType);
436             candidateServices = serviceRegistry.getServiceRegistrationsWithCapacity(jobType, services, hosts,
437                 systemLoad);
438           } else {
439             logger.trace("Using full list of services for dispatching of {} to a service of type '{}'", job, jobType);
440             candidateServices = serviceRegistry.getServiceRegistrationsByLoad(jobType, services, hosts, systemLoad);
441           }
442 
443           // Try to dispatch the job
444           String hostAcceptingJob;
445           try {
446             hostAcceptingJob = dispatchJob(job, candidateServices);
447             try {
448               systemLoad.updateNodeLoad(hostAcceptingJob, job.getJobLoad());
449             } catch (NotFoundException e) {
450               logger.info("Host {} not found in load list, cannot dispatch {} to it", hostAcceptingJob, job);
451             }
452 
453             dispatchPriorityList.remove(job.getId());
454           } catch (ServiceUnavailableException e) {
455             logger.debug("Jobs of type {} currently cannot be dispatched", job.getOperation());
456             // Don't mark workflow jobs as undispatchable to not impact worklfow operations
457             if (!ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(jobType)) {
458               undispatchableJobTypes.add(jobSignature);
459             }
460             continue;
461           } catch (UndispatchableJobException e) {
462             logger.debug("{} currently cannot be dispatched", job);
463             continue;
464           }
465 
466           logger.debug("{} dispatched to {}", job, hostAcceptingJob);
467         } catch (ServiceRegistryException e) {
468           Throwable cause = (e.getCause() != null) ? e.getCause() : e;
469           logger.error("Error dispatching {}", job, cause);
470         } finally {
471           securityService.setUser(null);
472           securityService.setOrganization(null);
473         }
474       }
475     }
476 
477     /**
478      * Dispatches the job to the least loaded service that will accept the job, or throws a
479      * <code>ServiceUnavailableException</code> if there is no such service.
480      *
481      * @param job      the job to dispatch
482      * @param services a list of service registrations
483      * @return the host that accepted the dispatched job, or <code>null</code> if no services took the job.
484      * @throws ServiceRegistryException    if the service registrations are unavailable
485      * @throws ServiceUnavailableException
486      *    if no service is available or if all available services refuse to take on more work
487      * @throws UndispatchableJobException  if the current job cannot be processed
488      */
489     private String dispatchJob(JpaJob job, List<ServiceRegistration> services)
490         throws ServiceRegistryException, ServiceUnavailableException, UndispatchableJobException {
491       if (services.size() == 0) {
492         logger.debug("No service is currently available to handle jobs of type '" + job.getJobType() + "'");
493         throw new ServiceUnavailableException("No service of type " + job.getJobType() + " available");
494       }
495 
496       // Try the service registrations, after the first one finished, we quit;
497       job.setStatus(Job.Status.DISPATCHING);
498 
499       boolean triedDispatching = false;
500       boolean jobLoadExceedsMaximumLoads = false;
501 
502       final Float highestMaxLoad = services.stream()
503                                            .map(s -> ((ServiceRegistrationJpaImpl) s).getHostRegistration())
504                                            .map(HostRegistration::getMaxLoad)
505                                            .max(Comparator.naturalOrder())
506                                            .get();
507 
508       if (job.getJobLoad() > highestMaxLoad) {
509         // None of the available hosts is able to accept the job because the largest max load value is less than this
510         // job's load value
511         jobLoadExceedsMaximumLoads = true;
512       }
513 
514       for (ServiceRegistration registration : services) {
515         job.setProcessorServiceRegistration((ServiceRegistrationJpaImpl) registration);
516 
517         // Skip registration of host with less max load than highest available max load
518         // Note: This service registration may or may not live on a node which is set to accept jobs exceeding
519         // its max load
520         if (jobLoadExceedsMaximumLoads
521             && job.getProcessorServiceRegistration().getHostRegistration().getMaxLoad() != highestMaxLoad) {
522           continue;
523         }
524 
525         try {
526           job = serviceRegistry.updateInternal(job); // will open a tx
527         } catch (Exception e) {
528           // In theory, we should catch javax.persistence.OptimisticLockException. Unfortunately, eclipselink throws
529           // org.eclipse.persistence.exceptions.OptimisticLockException. In order to avoid importing the implementation
530           // specific APIs, we just catch Exception.
531           logger.debug("Unable to dispatch {}.  This is likely caused by another service registry dispatching the job",
532               job);
533           throw new UndispatchableJobException(job + " is already being dispatched");
534         }
535 
536         triedDispatching = true;
537 
538         String serviceUrl = UrlSupport.concat(registration.getHost(), registration.getPath(), "dispatch");
539         HttpPost post = new HttpPost(serviceUrl);
540 
541         // Add current organization and user so they can be used during execution at the remote end
542         post.addHeader(ORGANIZATION_HEADER, securityService.getOrganization().getId());
543         post.addHeader(USER_HEADER, securityService.getUser().getUsername());
544 
545         List<BasicNameValuePair> params = new ArrayList<>();
546         params.add(new BasicNameValuePair("id", Long.toString(job.getId())));
547         params.add(new BasicNameValuePair("operation", job.getOperation()));
548         post.setEntity(new UrlEncodedFormEntity(params, UTF_8));
549 
550         // Post the request
551         HttpResponse response = null;
552         int responseStatusCode;
553         try {
554           logger.debug("Trying to dispatch {} type '{}' load {} to {}", job, job.getJobType(), job.getJobLoad(),
555               registration.getHost());
556           if (!ServiceRegistryJpaImpl.START_WORKFLOW.equals(job.getOperation())) {
557             serviceRegistry.setCurrentJob(job.toJob());
558           }
559           response = client.execute(post);
560           responseStatusCode = response.getStatusLine().getStatusCode();
561           if (responseStatusCode == HttpStatus.SC_NO_CONTENT) {
562             return registration.getHost();
563           } else if (responseStatusCode == HttpStatus.SC_SERVICE_UNAVAILABLE) {
564             logger.debug("Service {} is currently refusing to accept jobs of type {}", registration,
565                 job.getOperation());
566             continue;
567           } else if (responseStatusCode == HttpStatus.SC_PRECONDITION_FAILED) {
568             job.setStatus(Job.Status.FAILED);
569             job = serviceRegistry.updateJob(job); // will open a tx
570             logger.debug("Service {} refused to accept {}", registration, job);
571             throw new UndispatchableJobException(IOUtils.toString(response.getEntity().getContent()));
572           } else if (responseStatusCode == HttpStatus.SC_METHOD_NOT_ALLOWED) {
573             logger.debug("Service {} is not yet reachable", registration);
574             continue;
575           } else {
576             logger.warn("Service {} failed ({}) accepting {}", registration, responseStatusCode, job);
577             continue;
578           }
579         } catch (UndispatchableJobException e) {
580           throw e;
581         } catch (TrustedHttpClientException e) {
582           // Will try another node. If no other node, it will be re-queued
583           logger.warn("Unable to dispatch {}", job, e);
584           continue;
585         } catch (Exception e) {
586           logger.warn("Unable to dispatch {}", job, e);
587         } finally {
588           try {
589             client.close(response);
590           } catch (IOException e) {
591             // ignore
592           }
593           serviceRegistry.setCurrentJob(null);
594         }
595       }
596 
597       // We've tried dispatching to every online service that can handle this type of job, with no luck.
598       if (triedDispatching) {
599         // Workflow type jobs are not set to priority list, because they handle accepting jobs not based on the job load
600         // If the system don't accepts jobs whose load exceeds the host's max load we can't make use of the priority
601         // list
602         if (serviceRegistry.acceptJobLoadsExeedingMaxLoad && !dispatchPriorityList.containsKey(job.getId())
603             && !ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())
604             && job.getProcessorServiceRegistration() != null) {
605           String host = job.getProcessorServiceRegistration().getHost();
606           logger.debug("About to add {} to dispatchPriorityList with processor host {}", job, host);
607           dispatchPriorityList.put(job.getId(), host);
608         }
609 
610         try {
611           job.setStatus(Job.Status.QUEUED);
612           job.setProcessorServiceRegistration(null);
613           job = serviceRegistry.updateJob(job); // will open a tx
614         } catch (Exception e) {
615           logger.error("Unable to put {} back into queue", job, e);
616         }
617       }
618 
619       logger.debug("Unable to dispatch {}, no service is currently ready to accept the job", job);
620       throw new UndispatchableJobException(job + " is currently undispatchable");
621     }
622 
623     /**
624      * Return dispatchable job ids, where the job status is RESTART or QUEUED and the job id is listed in the given set.
625      *
626      * @param jobIds set with job id's interested in
627      * @return list with dispatchable job id's from the given set, with job status RESTART or QUEUED
628      */
629     protected Function<EntityManager, List<Long>> getDispatchableJobsWithIdFilterQuery(Set<Long> jobIds) {
630       return em -> {
631         if (jobIds == null || jobIds.isEmpty()) {
632           return Collections.emptyList();
633         }
634 
635         return namedQuery.findAll(
636             "Job.dispatchable.status.idfilter",
637             Long.class,
638             Pair.of("jobids", dispatchPriorityList.keySet()),
639             Pair.of("statuses", List.of(
640                 Job.Status.RESTART.ordinal(),
641                 Job.Status.QUEUED.ordinal()
642             ))
643         ).apply(em);
644       };
645     }
646 
647     private final Function<ServiceRegistration, HostRegistration> toHostRegistration =
648         new Function<ServiceRegistration, HostRegistration>() {
649       @Override
650       public HostRegistration apply(ServiceRegistration s) {
651         return ((ServiceRegistrationJpaImpl) s).getHostRegistration();
652       }
653     };
654 
655     private final Function<HostRegistration, Float> toMaxLoad = new Function<HostRegistration, Float>() {
656       @Override
657       public Float apply(HostRegistration h) {
658         return h.getMaxLoad();
659       }
660     };
661 
662     private final Comparator<Float> sortFloatValuesDesc = new Comparator<Float>() {
663       @Override
664       public int compare(Float o1, Float o2) {
665         return o2.compareTo(o1);
666       }
667     };
668   }
669 }