View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.serviceregistry.impl;
23  
24  import static java.nio.charset.StandardCharsets.UTF_8;
25  import static org.opencastproject.db.Queries.namedQuery;
26  import static org.opencastproject.security.api.SecurityConstants.ORGANIZATION_HEADER;
27  import static org.opencastproject.security.api.SecurityConstants.USER_HEADER;
28  
29  import org.opencastproject.db.DBSession;
30  import org.opencastproject.db.DBSessionFactory;
31  import org.opencastproject.job.api.Job;
32  import org.opencastproject.job.jpa.JpaJob;
33  import org.opencastproject.security.api.Organization;
34  import org.opencastproject.security.api.OrganizationDirectoryService;
35  import org.opencastproject.security.api.SecurityService;
36  import org.opencastproject.security.api.TrustedHttpClient;
37  import org.opencastproject.security.api.TrustedHttpClientException;
38  import org.opencastproject.security.api.User;
39  import org.opencastproject.security.api.UserDirectoryService;
40  import org.opencastproject.serviceregistry.api.HostRegistration;
41  import org.opencastproject.serviceregistry.api.ServiceRegistration;
42  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
43  import org.opencastproject.serviceregistry.api.SystemLoad;
44  import org.opencastproject.serviceregistry.impl.jpa.ServiceRegistrationJpaImpl;
45  import org.opencastproject.util.NotFoundException;
46  import org.opencastproject.util.UrlSupport;
47  
48  import org.apache.commons.io.IOUtils;
49  import org.apache.commons.lang3.StringUtils;
50  import org.apache.commons.lang3.tuple.Pair;
51  import org.apache.http.HttpResponse;
52  import org.apache.http.HttpStatus;
53  import org.apache.http.client.entity.UrlEncodedFormEntity;
54  import org.apache.http.client.methods.HttpPost;
55  import org.apache.http.message.BasicNameValuePair;
56  import org.osgi.service.cm.ConfigurationException;
57  import org.osgi.service.component.ComponentContext;
58  import org.osgi.service.component.annotations.Activate;
59  import org.osgi.service.component.annotations.Component;
60  import org.osgi.service.component.annotations.Modified;
61  import org.osgi.service.component.annotations.Reference;
62  import org.slf4j.Logger;
63  import org.slf4j.LoggerFactory;
64  
65  import java.io.IOException;
66  import java.util.ArrayList;
67  import java.util.Collections;
68  import java.util.Comparator;
69  import java.util.Dictionary;
70  import java.util.HashMap;
71  import java.util.HashSet;
72  import java.util.List;
73  import java.util.Map;
74  import java.util.Set;
75  import java.util.concurrent.Executors;
76  import java.util.concurrent.ScheduledFuture;
77  import java.util.concurrent.ScheduledThreadPoolExecutor;
78  import java.util.concurrent.TimeUnit;
79  import java.util.function.Function;
80  import java.util.stream.Collectors;
81  
82  import javax.persistence.EntityManager;
83  import javax.persistence.EntityManagerFactory;
84  
85  /**
86   * This dispatcher implementation will check for jobs in the QUEUED {@link Job.Status}. If
87   * new jobs are found, the dispatcher will attempt to dispatch each job to the least loaded service.
88   */
89  @Component(
90      property = {
91          "service.description=Job Dispatcher"
92      },
93      immediate = true,
94      service = { JobDispatcher.class }
95  )
96  public class JobDispatcher {
97  
98    /** JPA persistence unit name */
99    public static final String PERSISTENCE_UNIT = "org.opencastproject.common";
100 
101   /** Configuration key for the dispatch interval, in seconds */
102   protected static final String OPT_DISPATCHINTERVAL = "dispatch.interval";
103 
104   /** Minimum delay between job dispatching attempts, in seconds */
105   static final float MIN_DISPATCH_INTERVAL = 1.0F;
106 
107   /** Default delay between job dispatching attempts, in seconds */
108   static final float DEFAULT_DISPATCH_INTERVAL = 0.0F;
109 
110   /** Multiplicative factor to transform dispatch interval captured in seconds to milliseconds */
111   static final long DISPATCH_INTERVAL_MS_FACTOR = 1000;
112 
113   private static final Logger logger = LoggerFactory.getLogger(JobDispatcher.class);
114 
115   private ServiceRegistryJpaImpl serviceRegistry;
116 
117   private OrganizationDirectoryService organizationDirectoryService;
118   private UserDirectoryService userDirectoryService;
119   private SecurityService securityService;
120   private TrustedHttpClient client;
121 
122   /** The thread pool to use for dispatching. */
123   protected ScheduledThreadPoolExecutor scheduledExecutor = null;
124 
125   /** The factory used to generate the entity manager */
126   private EntityManagerFactory emf = null;
127 
128   protected DBSessionFactory dbSessionFactory;
129 
130   protected DBSession db;
131 
132   private ScheduledFuture jdfuture = null;
133 
134   /**
135    * A list with job types that cannot be dispatched in each interation
136    */
137   private List<String> undispatchableJobTypes = null;
138 
139   /** The dispatcher priority list */
140   protected final Map<Long, String> dispatchPriorityList = new HashMap<>();
141 
142   /** OSGi DI */
143   @Reference(target = "(osgi.unit.name=org.opencastproject.common)")
144   void setEntityManagerFactory(EntityManagerFactory emf) {
145     this.emf = emf;
146   }
147 
148   /** OSGi DI */
149   @Reference
150   public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
151     this.dbSessionFactory = dbSessionFactory;
152   }
153 
154   /** OSGi DI */
155   @Reference()
156   void setServiceRegistry(ServiceRegistryJpaImpl sr) {
157     this.serviceRegistry = sr;
158   }
159 
160   @Reference()
161   void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
162     this.organizationDirectoryService = organizationDirectoryService;
163   }
164 
165   @Reference
166   void setUserDirectoryService(UserDirectoryService svc) {
167     this.userDirectoryService = svc;
168   }
169 
170   @Reference
171   void setSecurityService(SecurityService sec) {
172     this.securityService = sec;
173   }
174 
175   @Reference
176   void setTrustedHttpClient(TrustedHttpClient client) {
177     this.client = client;
178   }
179 
180   @Activate
181   public void activate(ComponentContext cc) throws ConfigurationException  {
182     logger.info("Activate job dispatcher");
183     db = dbSessionFactory.createSession(emf);
184     scheduledExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
185     scheduledExecutor.setRemoveOnCancelPolicy(true);
186     logger.info("Activated");
187     updated(cc.getProperties());
188   }
189 
190 
191   @Modified
192   public void modified(ComponentContext cc) throws ConfigurationException {
193     logger.debug("Modified in job dispatcher");
194     updated(cc.getProperties());
195   }
196 
197   /**
198    * {@inheritDoc}
199    *
200    * @see org.osgi.service.cm.ManagedService#updated(java.util.Dictionary)
201    */
202   @SuppressWarnings("rawtypes")
203   public void updated(Dictionary properties) {
204 
205     logger.info("Updating job dispatcher properties");
206 
207     float dispatchInterval = DEFAULT_DISPATCH_INTERVAL;
208     String dispatchIntervalString = StringUtils.trimToNull((String) properties.get(OPT_DISPATCHINTERVAL));
209     if (StringUtils.isNotBlank(dispatchIntervalString)) {
210       try {
211         dispatchInterval = Float.parseFloat(dispatchIntervalString);
212       } catch (Exception e) {
213         logger.warn("Dispatch interval '{}' is malformed, setting to {}", dispatchIntervalString, MIN_DISPATCH_INTERVAL);
214         dispatchInterval = MIN_DISPATCH_INTERVAL;
215       }
216       if (dispatchInterval == 0) {
217         logger.info("Dispatching disabled");
218       } else if (dispatchInterval < MIN_DISPATCH_INTERVAL) {
219         logger.warn("Dispatch interval {} seconds is too low, adjusting to {}", dispatchInterval, MIN_DISPATCH_INTERVAL);
220         dispatchInterval = MIN_DISPATCH_INTERVAL;
221       } else {
222         logger.info("Dispatch interval set to {} seconds", dispatchInterval);
223       }
224     }
225 
226     // Stop the current dispatch thread so we can configure a new one
227     if (jdfuture != null) {
228       jdfuture.cancel(true);
229     }
230 
231     // Schedule the job dispatching.
232     if (dispatchInterval > 0) {
233       long dispatchIntervalMs = Math.round(dispatchInterval * DISPATCH_INTERVAL_MS_FACTOR);
234       logger.info("Job dispatching is enabled");
235       logger.debug("Starting job dispatching at a custom interval of {}s", dispatchInterval);
236       jdfuture = scheduledExecutor.scheduleWithFixedDelay(getJobDispatcherRunnable(), dispatchIntervalMs, dispatchIntervalMs,
237           TimeUnit.MILLISECONDS);
238     } else {
239       logger.info("Job dispatching is disabled");
240     }
241   }
242 
243   Runnable getJobDispatcherRunnable() {
244     return new JobDispatcherRunner();
245   }
246 
247   public class JobDispatcherRunner implements Runnable {
248 
249     /**
250      * {@inheritDoc}
251      *
252      * @see Thread#run()
253      */
254     @Override
255     public void run() {
256       logger.debug("Starting job dispatch");
257 
258       undispatchableJobTypes = new ArrayList<>();
259       try {
260         //GDLGDL: move collectJobStats to the JD config, then this is reasonable
261         // FIXME: the stats are not currently used and the queries are very expensive in database time.
262         if (serviceRegistry.collectJobstats) {
263           serviceRegistry.updateStatisticsJobData();
264         }
265 
266         if (!dispatchPriorityList.isEmpty()) {
267           logger.trace("Checking for outdated jobs in dispatchPriorityList's '{}' jobs", dispatchPriorityList.size());
268           // Remove outdated jobs from priority list
269           List<Long> jobIds = db.exec(getDispatchableJobsWithIdFilterQuery(dispatchPriorityList.keySet()));
270           for (Long jobId : new HashSet<>(dispatchPriorityList.keySet())) {
271             if (!jobIds.contains(jobId)) {
272               logger.debug("Removing outdated dispatchPriorityList job '{}'", jobId);
273               dispatchPriorityList.remove(jobId);
274             }
275           }
276         }
277 
278         int jobsOffset = 0;
279         List<JpaJob> dispatchableJobs;
280         List<JpaJob> workflowJobs = new ArrayList<>();
281         boolean jobsFound;
282         do {
283           // dispatch all dispatchable jobs with status restarted
284           dispatchableJobs = db.exec(serviceRegistry.getDispatchableJobsWithStatusQuery(
285               jobsOffset, ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT, Job.Status.RESTART
286           ));
287           jobsOffset += ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT;
288           jobsFound = !dispatchableJobs.isEmpty();
289 
290           // skip all jobs of type workflow, we will handle them next
291           for (JpaJob job : dispatchableJobs) {
292             if (ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())) {
293               workflowJobs.add(job);
294             }
295           }
296           if (dispatchableJobs.removeAll(workflowJobs) && dispatchableJobs.isEmpty()) {
297             continue;
298           }
299 
300           dispatchDispatchableJobs(dispatchableJobs);
301         } while (jobsFound);
302 
303         jobsOffset = 0;
304         do {
305           // dispatch all dispatchable jobs with status queued
306           dispatchableJobs = db.exec(serviceRegistry.getDispatchableJobsWithStatusQuery(
307               jobsOffset, ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT, Job.Status.QUEUED
308           ));
309           jobsOffset += ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT;
310           jobsFound = !dispatchableJobs.isEmpty();
311 
312           // skip all jobs of type workflow, we will handle them next
313           for (JpaJob job : dispatchableJobs) {
314             if (ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())) {
315               workflowJobs.add(job);
316             }
317           }
318           if (dispatchableJobs.removeAll(workflowJobs) && dispatchableJobs.isEmpty()) {
319             continue;
320           }
321 
322           dispatchDispatchableJobs(dispatchableJobs);
323         } while (jobsFound);
324 
325         if (!workflowJobs.isEmpty()) {
326           dispatchDispatchableJobs(workflowJobs);
327         }
328       } catch (Throwable t) {
329         logger.warn("Error dispatching jobs", t);
330       } finally {
331         undispatchableJobTypes = null;
332       }
333 
334       logger.debug("Finished job dispatch");
335     }
336 
337     /**
338      * Dispatch the given jobs.
339      *
340      * @param jobsToDispatch list with dispatchable jobs to dispatch
341      */
342     private void dispatchDispatchableJobs(List<JpaJob> jobsToDispatch) {
343       // Get the current system load
344       SystemLoad systemLoad = db.exec(serviceRegistry.getHostLoadsQuery());
345 
346       for (JpaJob job : jobsToDispatch) {
347         // Remember the job type
348         String jobType = job.getJobType();
349 
350         // Skip jobs that we already know can't be dispatched except of jobs in the priority list
351         String jobSignature = jobType + '@' + job.getOperation();
352         if (undispatchableJobTypes.contains(jobSignature) && !dispatchPriorityList.containsKey(job.getId())) {
353           logger.trace("Skipping dispatching of {} with type '{}' for this round of dispatching", job, jobType);
354           continue;
355         }
356 
357         // Set the job's user and organization prior to dispatching
358         String creator = job.getCreator();
359         String creatorOrganization = job.getOrganization();
360 
361         // Try to load the organization.
362         Organization organization;
363         try {
364           organization = organizationDirectoryService.getOrganization(creatorOrganization);
365           securityService.setOrganization(organization);
366         } catch (NotFoundException e) {
367           logger.debug("Skipping dispatching of job for non-existing organization '{}'", creatorOrganization);
368           continue;
369         }
370 
371         // Try to load the user
372         User user = userDirectoryService.loadUser(creator);
373         if (user == null) {
374           logger.warn("Unable to dispatch {}: creator '{}' is not available", job, creator);
375           continue;
376         }
377         securityService.setUser(user);
378 
379         // Start dispatching
380         try {
381           List<ServiceRegistration> services = db.exec(serviceRegistry.getServiceRegistrationsQuery());
382           List<HostRegistration> hosts = db.exec(serviceRegistry.getHostRegistrationsQuery()).stream()
383                                            .filter(host -> !dispatchPriorityList.containsValue(host.getBaseUrl())
384                                                || host.getBaseUrl().equals(dispatchPriorityList.get(job.getId())))
385                                            .collect(Collectors.toList());
386           List<ServiceRegistration> candidateServices;
387 
388           // Depending on whether this running job is trying to reach out to other services or whether this is an
389           // attempt to execute the next operation in a workflow, choose either from a limited or from the full list
390           // of services
391           Job parentJob = null;
392           try {
393             if (job.getParentJob() != null) {
394               parentJob = serviceRegistry.getJob(job.getParentJob().getId());
395             }
396           } catch (NotFoundException e) {
397             // That's ok
398           }
399 
400           // When a job A starts a series of child jobs, then those child jobs should only be dispatched at the
401           // same time if there is processing capacity available.
402           boolean parentHasRunningChildren = false;
403           if (parentJob != null) {
404             for (Job child : serviceRegistry.getChildJobs(parentJob.getId())) {
405               if (Job.Status.RUNNING.equals(child.getStatus())) {
406                 parentHasRunningChildren = true;
407                 break;
408               }
409             }
410           }
411 
412           // If this is a root job (a new workflow or a new workflow operation), then only dispatch if there is
413           // capacity, i. e. the workflow service is ok dispatching the next workflow or the next workflow operation.
414           if (parentJob == null || ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(jobType) || parentHasRunningChildren) {
415             logger.trace("Using available capacity only for dispatching of {} to a service of type '{}'", job, jobType);
416             candidateServices = serviceRegistry.getServiceRegistrationsWithCapacity(jobType, services, hosts, systemLoad);
417           } else {
418             logger.trace("Using full list of services for dispatching of {} to a service of type '{}'", job, jobType);
419             candidateServices = serviceRegistry.getServiceRegistrationsByLoad(jobType, services, hosts, systemLoad);
420           }
421 
422           // Try to dispatch the job
423           String hostAcceptingJob;
424           try {
425             hostAcceptingJob = dispatchJob(job, candidateServices);
426             try {
427               systemLoad.updateNodeLoad(hostAcceptingJob, job.getJobLoad());
428             } catch (NotFoundException e) {
429               logger.info("Host {} not found in load list, cannot dispatch {} to it", hostAcceptingJob, job);
430             }
431 
432             dispatchPriorityList.remove(job.getId());
433           } catch (ServiceUnavailableException e) {
434             logger.debug("Jobs of type {} currently cannot be dispatched", job.getOperation());
435             // Don't mark workflow jobs as undispatchable to not impact worklfow operations
436             if (!ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(jobType)) {
437               undispatchableJobTypes.add(jobSignature);
438             }
439             continue;
440           } catch (UndispatchableJobException e) {
441             logger.debug("{} currently cannot be dispatched", job);
442             continue;
443           }
444 
445           logger.debug("{} dispatched to {}", job, hostAcceptingJob);
446         } catch (ServiceRegistryException e) {
447           Throwable cause = (e.getCause() != null) ? e.getCause() : e;
448           logger.error("Error dispatching {}", job, cause);
449         } finally {
450           securityService.setUser(null);
451           securityService.setOrganization(null);
452         }
453       }
454     }
455 
456     /**
457      * Dispatches the job to the least loaded service that will accept the job, or throws a
458      * <code>ServiceUnavailableException</code> if there is no such service.
459      *
460      * @param job      the job to dispatch
461      * @param services a list of service registrations
462      * @return the host that accepted the dispatched job, or <code>null</code> if no services took the job.
463      * @throws ServiceRegistryException    if the service registrations are unavailable
464      * @throws ServiceUnavailableException if no service is available or if all available services refuse to take on more work
465      * @throws UndispatchableJobException  if the current job cannot be processed
466      */
467     private String dispatchJob(JpaJob job, List<ServiceRegistration> services)
468         throws ServiceRegistryException, ServiceUnavailableException, UndispatchableJobException {
469       if (services.size() == 0) {
470         logger.debug("No service is currently available to handle jobs of type '" + job.getJobType() + "'");
471         throw new ServiceUnavailableException("No service of type " + job.getJobType() + " available");
472       }
473 
474       // Try the service registrations, after the first one finished, we quit;
475       job.setStatus(Job.Status.DISPATCHING);
476 
477       boolean triedDispatching = false;
478       boolean jobLoadExceedsMaximumLoads = false;
479 
480       final Float highestMaxLoad = services.stream()
481                                            .map(s -> ((ServiceRegistrationJpaImpl) s).getHostRegistration())
482                                            .map(HostRegistration::getMaxLoad)
483                                            .max(Comparator.naturalOrder())
484                                            .get();
485 
486       if (job.getJobLoad() > highestMaxLoad) {
487         // None of the available hosts is able to accept the job because the largest max load value is less than this job's load value
488         jobLoadExceedsMaximumLoads = true;
489       }
490 
491       for (ServiceRegistration registration : services) {
492         job.setProcessorServiceRegistration((ServiceRegistrationJpaImpl) registration);
493 
494         // Skip registration of host with less max load than highest available max load
495         // Note: This service registration may or may not live on a node which is set to accept jobs exceeding its max load
496         if (jobLoadExceedsMaximumLoads && job.getProcessorServiceRegistration().getHostRegistration().getMaxLoad() != highestMaxLoad) {
497           continue;
498         }
499 
500         try {
501           job = serviceRegistry.updateInternal(job); // will open a tx
502         } catch (Exception e) {
503           // In theory, we should catch javax.persistence.OptimisticLockException. Unfortunately, eclipselink throws
504           // org.eclipse.persistence.exceptions.OptimisticLockException. In order to avoid importing the implementation
505           // specific APIs, we just catch Exception.
506           logger.debug("Unable to dispatch {}.  This is likely caused by another service registry dispatching the job",
507               job);
508           throw new UndispatchableJobException(job + " is already being dispatched");
509         }
510 
511         triedDispatching = true;
512 
513         String serviceUrl = UrlSupport.concat(registration.getHost(), registration.getPath(), "dispatch");
514         HttpPost post = new HttpPost(serviceUrl);
515 
516         // Add current organization and user so they can be used during execution at the remote end
517         post.addHeader(ORGANIZATION_HEADER, securityService.getOrganization().getId());
518         post.addHeader(USER_HEADER, securityService.getUser().getUsername());
519 
520         List<BasicNameValuePair> params = new ArrayList<>();
521         params.add(new BasicNameValuePair("id", Long.toString(job.getId())));
522         params.add(new BasicNameValuePair("operation", job.getOperation()));
523         post.setEntity(new UrlEncodedFormEntity(params, UTF_8));
524 
525         // Post the request
526         HttpResponse response = null;
527         int responseStatusCode;
528         try {
529           logger.debug("Trying to dispatch {} type '{}' load {} to {}", job, job.getJobType(), job.getJobLoad(),
530               registration.getHost());
531           if (!ServiceRegistryJpaImpl.START_WORKFLOW.equals(job.getOperation())) {
532             serviceRegistry.setCurrentJob(job.toJob());
533           }
534           response = client.execute(post);
535           responseStatusCode = response.getStatusLine().getStatusCode();
536           if (responseStatusCode == HttpStatus.SC_NO_CONTENT) {
537             return registration.getHost();
538           } else if (responseStatusCode == HttpStatus.SC_SERVICE_UNAVAILABLE) {
539             logger.debug("Service {} is currently refusing to accept jobs of type {}", registration, job.getOperation());
540             continue;
541           } else if (responseStatusCode == HttpStatus.SC_PRECONDITION_FAILED) {
542             job.setStatus(Job.Status.FAILED);
543             job = serviceRegistry.updateJob(job); // will open a tx
544             logger.debug("Service {} refused to accept {}", registration, job);
545             throw new UndispatchableJobException(IOUtils.toString(response.getEntity().getContent()));
546           } else if (responseStatusCode == HttpStatus.SC_METHOD_NOT_ALLOWED) {
547             logger.debug("Service {} is not yet reachable", registration);
548             continue;
549           } else {
550             logger.warn("Service {} failed ({}) accepting {}", registration, responseStatusCode, job);
551             continue;
552           }
553         } catch (UndispatchableJobException e) {
554           throw e;
555         } catch (TrustedHttpClientException e) {
556           // Will try another node. If no other node, it will be re-queued
557           logger.warn("Unable to dispatch {}", job, e);
558           continue;
559         } catch (Exception e) {
560           logger.warn("Unable to dispatch {}", job, e);
561         } finally {
562           try {
563             client.close(response);
564           } catch (IOException e) {
565             // ignore
566           }
567           serviceRegistry.setCurrentJob(null);
568         }
569       }
570 
571       // We've tried dispatching to every online service that can handle this type of job, with no luck.
572       if (triedDispatching) {
573         // Workflow type jobs are not set to priority list, because they handle accepting jobs not based on the job load
574         // If the system don't accepts jobs whose load exceeds the host's max load we can't make use of the priority
575         // list
576         if (serviceRegistry.acceptJobLoadsExeedingMaxLoad && !dispatchPriorityList.containsKey(job.getId()) && !ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())
577             && job.getProcessorServiceRegistration() != null) {
578           String host = job.getProcessorServiceRegistration().getHost();
579           logger.debug("About to add {} to dispatchPriorityList with processor host {}", job, host);
580           dispatchPriorityList.put(job.getId(), host);
581         }
582 
583         try {
584           job.setStatus(Job.Status.QUEUED);
585           job.setProcessorServiceRegistration(null);
586           job = serviceRegistry.updateJob(job); // will open a tx
587         } catch (Exception e) {
588           logger.error("Unable to put {} back into queue", job, e);
589         }
590       }
591 
592       logger.debug("Unable to dispatch {}, no service is currently ready to accept the job", job);
593       throw new UndispatchableJobException(job + " is currently undispatchable");
594     }
595 
596     /**
597      * Return dispatchable job ids, where the job status is RESTART or QUEUED and the job id is listed in the given set.
598      *
599      * @param jobIds set with job id's interested in
600      * @return list with dispatchable job id's from the given set, with job status RESTART or QUEUED
601      */
602     protected Function<EntityManager, List<Long>> getDispatchableJobsWithIdFilterQuery(Set<Long> jobIds) {
603       return em -> {
604         if (jobIds == null || jobIds.isEmpty()) {
605           return Collections.emptyList();
606         }
607 
608         return namedQuery.findAll(
609             "Job.dispatchable.status.idfilter",
610             Long.class,
611             Pair.of("jobids", dispatchPriorityList.keySet()),
612             Pair.of("statuses", List.of(
613                 Job.Status.RESTART.ordinal(),
614                 Job.Status.QUEUED.ordinal()
615             ))
616         ).apply(em);
617       };
618     }
619 
620     private final Function<ServiceRegistration, HostRegistration> toHostRegistration = new Function<ServiceRegistration, HostRegistration>() {
621       @Override
622       public HostRegistration apply(ServiceRegistration s) {
623         return ((ServiceRegistrationJpaImpl) s).getHostRegistration();
624       }
625     };
626 
627     private final Function<HostRegistration, Float> toMaxLoad = new Function<HostRegistration, Float>() {
628       @Override
629       public Float apply(HostRegistration h) {
630         return h.getMaxLoad();
631       }
632     };
633 
634     private final Comparator<Float> sortFloatValuesDesc = new Comparator<Float>() {
635       @Override
636       public int compare(Float o1, Float o2) {
637         return o2.compareTo(o1);
638       }
639     };
640   }
641 }