View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.serviceregistry.impl;
23  
24  import static java.nio.charset.StandardCharsets.UTF_8;
25  import static org.opencastproject.db.Queries.namedQuery;
26  import static org.opencastproject.security.api.SecurityConstants.ORGANIZATION_HEADER;
27  import static org.opencastproject.security.api.SecurityConstants.USER_HEADER;
28  
29  import org.opencastproject.db.DBSession;
30  import org.opencastproject.db.DBSessionFactory;
31  import org.opencastproject.job.api.Job;
32  import org.opencastproject.job.jpa.JpaJob;
33  import org.opencastproject.security.api.Organization;
34  import org.opencastproject.security.api.OrganizationDirectoryService;
35  import org.opencastproject.security.api.SecurityService;
36  import org.opencastproject.security.api.TrustedHttpClient;
37  import org.opencastproject.security.api.TrustedHttpClientException;
38  import org.opencastproject.security.api.User;
39  import org.opencastproject.security.api.UserDirectoryService;
40  import org.opencastproject.serviceregistry.api.HostRegistration;
41  import org.opencastproject.serviceregistry.api.ServiceRegistration;
42  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
43  import org.opencastproject.serviceregistry.api.SystemLoad;
44  import org.opencastproject.serviceregistry.impl.jpa.ServiceRegistrationJpaImpl;
45  import org.opencastproject.util.NotFoundException;
46  import org.opencastproject.util.UrlSupport;
47  
48  import org.apache.commons.io.IOUtils;
49  import org.apache.commons.lang3.StringUtils;
50  import org.apache.commons.lang3.tuple.Pair;
51  import org.apache.http.HttpResponse;
52  import org.apache.http.HttpStatus;
53  import org.apache.http.client.entity.UrlEncodedFormEntity;
54  import org.apache.http.client.methods.HttpPost;
55  import org.apache.http.message.BasicNameValuePair;
56  import org.osgi.service.cm.ConfigurationException;
57  import org.osgi.service.component.ComponentContext;
58  import org.osgi.service.component.annotations.Activate;
59  import org.osgi.service.component.annotations.Component;
60  import org.osgi.service.component.annotations.Modified;
61  import org.osgi.service.component.annotations.Reference;
62  import org.slf4j.Logger;
63  import org.slf4j.LoggerFactory;
64  
65  import java.io.IOException;
66  import java.util.ArrayList;
67  import java.util.Collections;
68  import java.util.Comparator;
69  import java.util.Dictionary;
70  import java.util.HashMap;
71  import java.util.HashSet;
72  import java.util.List;
73  import java.util.Map;
74  import java.util.Set;
75  import java.util.concurrent.Executors;
76  import java.util.concurrent.ScheduledFuture;
77  import java.util.concurrent.ScheduledThreadPoolExecutor;
78  import java.util.concurrent.TimeUnit;
79  import java.util.function.Function;
80  import java.util.stream.Collectors;
81  
82  import javax.persistence.EntityManager;
83  import javax.persistence.EntityManagerFactory;
84  
85  /**
86   * This dispatcher implementation will check for jobs in the QUEUED {@link Job.Status}. If
87   * new jobs are found, the dispatcher will attempt to dispatch each job to the least loaded service.
88   */
89  @Component(
90      property = {
91          "service.description=Job Dispatcher"
92      },
93      immediate = true,
94      service = { JobDispatcher.class }
95  )
96  public class JobDispatcher {
97  
98    /** JPA persistence unit name */
99    public static final String PERSISTENCE_UNIT = "org.opencastproject.common";
100 
101   /** Configuration key for the dispatch interval, in seconds */
102   protected static final String OPT_DISPATCHINTERVAL = "dispatch.interval";
103 
104   /** Minimum delay between job dispatching attempts, in seconds */
105   static final float MIN_DISPATCH_INTERVAL = 1.0F;
106 
107   /** Default delay between job dispatching attempts, in seconds */
108   static final float DEFAULT_DISPATCH_INTERVAL = 0.0F;
109 
110   /** Multiplicative factor to transform dispatch interval captured in seconds to milliseconds */
111   static final long DISPATCH_INTERVAL_MS_FACTOR = 1000;
112 
113   private static final Logger logger = LoggerFactory.getLogger(JobDispatcher.class);
114 
115   private ServiceRegistryJpaImpl serviceRegistry;
116 
117   private OrganizationDirectoryService organizationDirectoryService;
118   private UserDirectoryService userDirectoryService;
119   private SecurityService securityService;
120   private TrustedHttpClient client;
121 
122   /** The thread pool to use for dispatching. */
123   protected ScheduledThreadPoolExecutor scheduledExecutor = null;
124 
125   /** The factory used to generate the entity manager */
126   private EntityManagerFactory emf = null;
127 
128   protected DBSessionFactory dbSessionFactory;
129 
130   protected DBSession db;
131 
132   private ScheduledFuture jdfuture = null;
133 
134   /**
135    * A list with job types that cannot be dispatched in each interation
136    */
137   private List<String> undispatchableJobTypes = null;
138 
139   /** The dispatcher priority list */
140   protected final Map<Long, String> dispatchPriorityList = new HashMap<>();
141 
142   /** OSGi DI */
143   @Reference(target = "(osgi.unit.name=org.opencastproject.common)")
144   void setEntityManagerFactory(EntityManagerFactory emf) {
145     this.emf = emf;
146   }
147 
148   /** OSGi DI */
149   @Reference
150   public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
151     this.dbSessionFactory = dbSessionFactory;
152   }
153 
154   /** OSGi DI */
155   @Reference()
156   void setServiceRegistry(ServiceRegistryJpaImpl sr) {
157     this.serviceRegistry = sr;
158   }
159 
160   @Reference()
161   void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
162     this.organizationDirectoryService = organizationDirectoryService;
163   }
164 
165   @Reference
166   void setUserDirectoryService(UserDirectoryService svc) {
167     this.userDirectoryService = svc;
168   }
169 
170   @Reference
171   void setSecurityService(SecurityService sec) {
172     this.securityService = sec;
173   }
174 
175   @Reference
176   void setTrustedHttpClient(TrustedHttpClient client) {
177     this.client = client;
178   }
179 
180   @Activate
181   public void activate(ComponentContext cc) throws ConfigurationException  {
182     logger.info("Activate job dispatcher");
183     db = dbSessionFactory.createSession(emf);
184     scheduledExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
185     scheduledExecutor.setRemoveOnCancelPolicy(true);
186     logger.info("Activated");
187     updated(cc.getProperties());
188   }
189 
190 
191   @Modified
192   public void modified(ComponentContext cc) throws ConfigurationException {
193     logger.debug("Modified in job dispatcher");
194     updated(cc.getProperties());
195   }
196 
197   /**
198    * {@inheritDoc}
199    *
200    * @see org.osgi.service.cm.ManagedService#updated(java.util.Dictionary)
201    */
202   @SuppressWarnings("rawtypes")
203   public void updated(Dictionary properties) {
204 
205     logger.info("Updating job dispatcher properties");
206 
207     float dispatchInterval = DEFAULT_DISPATCH_INTERVAL;
208     String dispatchIntervalString = StringUtils.trimToNull((String) properties.get(OPT_DISPATCHINTERVAL));
209     if (StringUtils.isNotBlank(dispatchIntervalString)) {
210       try {
211         dispatchInterval = Float.parseFloat(dispatchIntervalString);
212       } catch (Exception e) {
213         logger.warn("Dispatch interval '{}' is malformed, setting to {}", dispatchIntervalString,
214             MIN_DISPATCH_INTERVAL);
215         dispatchInterval = MIN_DISPATCH_INTERVAL;
216       }
217       if (dispatchInterval == 0) {
218         logger.info("Dispatching disabled");
219       } else if (dispatchInterval < MIN_DISPATCH_INTERVAL) {
220         logger.warn("Dispatch interval {} seconds is too low, adjusting to {}", dispatchInterval,
221             MIN_DISPATCH_INTERVAL);
222         dispatchInterval = MIN_DISPATCH_INTERVAL;
223       } else {
224         logger.info("Dispatch interval set to {} seconds", dispatchInterval);
225       }
226     }
227 
228     // Stop the current dispatch thread so we can configure a new one
229     if (jdfuture != null) {
230       jdfuture.cancel(true);
231     }
232 
233     // Schedule the job dispatching.
234     if (dispatchInterval > 0) {
235       long dispatchIntervalMs = Math.round(dispatchInterval * DISPATCH_INTERVAL_MS_FACTOR);
236       logger.info("Job dispatching is enabled");
237       logger.debug("Starting job dispatching at a custom interval of {}s", dispatchInterval);
238       jdfuture = scheduledExecutor.scheduleWithFixedDelay(getJobDispatcherRunnable(), dispatchIntervalMs,
239           dispatchIntervalMs, TimeUnit.MILLISECONDS);
240     } else {
241       logger.info("Job dispatching is disabled");
242     }
243   }
244 
245   Runnable getJobDispatcherRunnable() {
246     return new JobDispatcherRunner();
247   }
248 
249   public class JobDispatcherRunner implements Runnable {
250 
251     /**
252      * {@inheritDoc}
253      *
254      * @see Thread#run()
255      */
256     @Override
257     public void run() {
258       logger.debug("Starting job dispatch");
259 
260       undispatchableJobTypes = new ArrayList<>();
261       try {
262         //GDLGDL: move collectJobStats to the JD config, then this is reasonable
263         // FIXME: the stats are not currently used and the queries are very expensive in database time.
264         if (serviceRegistry.collectJobstats) {
265           serviceRegistry.updateStatisticsJobData();
266         }
267 
268         if (!dispatchPriorityList.isEmpty()) {
269           logger.trace("Checking for outdated jobs in dispatchPriorityList's '{}' jobs", dispatchPriorityList.size());
270           // Remove outdated jobs from priority list
271           List<Long> jobIds = db.exec(getDispatchableJobsWithIdFilterQuery(dispatchPriorityList.keySet()));
272           for (Long jobId : new HashSet<>(dispatchPriorityList.keySet())) {
273             if (!jobIds.contains(jobId)) {
274               logger.debug("Removing outdated dispatchPriorityList job '{}'", jobId);
275               dispatchPriorityList.remove(jobId);
276             }
277           }
278         }
279 
280         int jobsOffset = 0;
281         List<JpaJob> dispatchableJobs;
282         List<JpaJob> workflowJobs = new ArrayList<>();
283         boolean jobsFound;
284         do {
285           // dispatch all dispatchable jobs with status restarted
286           dispatchableJobs = db.exec(serviceRegistry.getDispatchableJobsWithStatusQuery(
287               jobsOffset, ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT, Job.Status.RESTART
288           ));
289           jobsOffset += ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT;
290           jobsFound = !dispatchableJobs.isEmpty();
291 
292           // skip all jobs of type workflow, we will handle them next
293           for (JpaJob job : dispatchableJobs) {
294             if (ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())) {
295               workflowJobs.add(job);
296             }
297           }
298           if (dispatchableJobs.removeAll(workflowJobs) && dispatchableJobs.isEmpty()) {
299             continue;
300           }
301 
302           dispatchDispatchableJobs(dispatchableJobs);
303         } while (jobsFound);
304 
305         jobsOffset = 0;
306         do {
307           // dispatch all dispatchable jobs with status queued
308           dispatchableJobs = db.exec(serviceRegistry.getDispatchableJobsWithStatusQuery(
309               jobsOffset, ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT, Job.Status.QUEUED
310           ));
311           jobsOffset += ServiceRegistryJpaImpl.DEFAULT_DISPATCH_JOBS_LIMIT;
312           jobsFound = !dispatchableJobs.isEmpty();
313 
314           // skip all jobs of type workflow, we will handle them next
315           for (JpaJob job : dispatchableJobs) {
316             if (ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())) {
317               workflowJobs.add(job);
318             }
319           }
320           if (dispatchableJobs.removeAll(workflowJobs) && dispatchableJobs.isEmpty()) {
321             continue;
322           }
323 
324           dispatchDispatchableJobs(dispatchableJobs);
325         } while (jobsFound);
326 
327         if (!workflowJobs.isEmpty()) {
328           dispatchDispatchableJobs(workflowJobs);
329         }
330       } catch (Throwable t) {
331         logger.warn("Error dispatching jobs", t);
332       } finally {
333         undispatchableJobTypes = null;
334       }
335 
336       logger.debug("Finished job dispatch");
337     }
338 
339     /**
340      * Dispatch the given jobs.
341      *
342      * @param jobsToDispatch list with dispatchable jobs to dispatch
343      */
344     private void dispatchDispatchableJobs(List<JpaJob> jobsToDispatch) {
345       // Get the current system load
346       SystemLoad systemLoad = db.exec(serviceRegistry.getHostLoadsQuery());
347 
348       for (JpaJob job : jobsToDispatch) {
349         // Remember the job type
350         String jobType = job.getJobType();
351 
352         // Skip jobs that we already know can't be dispatched except of jobs in the priority list
353         String jobSignature = jobType + '@' + job.getOperation();
354         if (undispatchableJobTypes.contains(jobSignature) && !dispatchPriorityList.containsKey(job.getId())) {
355           logger.trace("Skipping dispatching of {} with type '{}' for this round of dispatching", job, jobType);
356           continue;
357         }
358 
359         // Set the job's user and organization prior to dispatching
360         String creator = job.getCreator();
361         String creatorOrganization = job.getOrganization();
362 
363         // Try to load the organization.
364         Organization organization;
365         try {
366           organization = organizationDirectoryService.getOrganization(creatorOrganization);
367           securityService.setOrganization(organization);
368         } catch (NotFoundException e) {
369           logger.debug("Skipping dispatching of job for non-existing organization '{}'", creatorOrganization);
370           continue;
371         }
372 
373         // Try to load the user
374         User user = userDirectoryService.loadUser(creator);
375         if (user == null) {
376           logger.warn("Unable to dispatch {}: creator '{}' is not available", job, creator);
377           continue;
378         }
379         securityService.setUser(user);
380 
381         // Start dispatching
382         try {
383           List<ServiceRegistration> services = db.exec(serviceRegistry.getServiceRegistrationsQuery());
384           List<HostRegistration> hosts = db.exec(serviceRegistry.getHostRegistrationsQuery()).stream()
385                                            .filter(host -> !dispatchPriorityList.containsValue(host.getBaseUrl())
386                                                || host.getBaseUrl().equals(dispatchPriorityList.get(job.getId())))
387                                            .collect(Collectors.toList());
388           List<ServiceRegistration> candidateServices;
389 
390           // Depending on whether this running job is trying to reach out to other services or whether this is an
391           // attempt to execute the next operation in a workflow, choose either from a limited or from the full list
392           // of services
393           Job parentJob = null;
394           try {
395             if (job.getParentJob() != null) {
396               parentJob = serviceRegistry.getJob(job.getParentJob().getId());
397             }
398           } catch (NotFoundException e) {
399             // That's ok
400           }
401 
402           // When a job A starts a series of child jobs, then those child jobs should only be dispatched at the
403           // same time if there is processing capacity available.
404           boolean parentHasRunningChildren = false;
405           if (parentJob != null) {
406             for (Job child : serviceRegistry.getChildJobs(parentJob.getId())) {
407               if (Job.Status.RUNNING.equals(child.getStatus())) {
408                 parentHasRunningChildren = true;
409                 break;
410               }
411             }
412           }
413 
414           // If this is a root job (a new workflow or a new workflow operation), then only dispatch if there is
415           // capacity, i. e. the workflow service is ok dispatching the next workflow or the next workflow operation.
416           if (parentJob == null || ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(jobType) || parentHasRunningChildren) {
417             logger.trace("Using available capacity only for dispatching of {} to a service of type '{}'", job, jobType);
418             candidateServices = serviceRegistry.getServiceRegistrationsWithCapacity(jobType, services, hosts,
419                 systemLoad);
420           } else {
421             logger.trace("Using full list of services for dispatching of {} to a service of type '{}'", job, jobType);
422             candidateServices = serviceRegistry.getServiceRegistrationsByLoad(jobType, services, hosts, systemLoad);
423           }
424 
425           // Try to dispatch the job
426           String hostAcceptingJob;
427           try {
428             hostAcceptingJob = dispatchJob(job, candidateServices);
429             try {
430               systemLoad.updateNodeLoad(hostAcceptingJob, job.getJobLoad());
431             } catch (NotFoundException e) {
432               logger.info("Host {} not found in load list, cannot dispatch {} to it", hostAcceptingJob, job);
433             }
434 
435             dispatchPriorityList.remove(job.getId());
436           } catch (ServiceUnavailableException e) {
437             logger.debug("Jobs of type {} currently cannot be dispatched", job.getOperation());
438             // Don't mark workflow jobs as undispatchable to not impact worklfow operations
439             if (!ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(jobType)) {
440               undispatchableJobTypes.add(jobSignature);
441             }
442             continue;
443           } catch (UndispatchableJobException e) {
444             logger.debug("{} currently cannot be dispatched", job);
445             continue;
446           }
447 
448           logger.debug("{} dispatched to {}", job, hostAcceptingJob);
449         } catch (ServiceRegistryException e) {
450           Throwable cause = (e.getCause() != null) ? e.getCause() : e;
451           logger.error("Error dispatching {}", job, cause);
452         } finally {
453           securityService.setUser(null);
454           securityService.setOrganization(null);
455         }
456       }
457     }
458 
459     /**
460      * Dispatches the job to the least loaded service that will accept the job, or throws a
461      * <code>ServiceUnavailableException</code> if there is no such service.
462      *
463      * @param job      the job to dispatch
464      * @param services a list of service registrations
465      * @return the host that accepted the dispatched job, or <code>null</code> if no services took the job.
466      * @throws ServiceRegistryException    if the service registrations are unavailable
467      * @throws ServiceUnavailableException
468      *    if no service is available or if all available services refuse to take on more work
469      * @throws UndispatchableJobException  if the current job cannot be processed
470      */
471     private String dispatchJob(JpaJob job, List<ServiceRegistration> services)
472         throws ServiceRegistryException, ServiceUnavailableException, UndispatchableJobException {
473       if (services.size() == 0) {
474         logger.debug("No service is currently available to handle jobs of type '" + job.getJobType() + "'");
475         throw new ServiceUnavailableException("No service of type " + job.getJobType() + " available");
476       }
477 
478       // Try the service registrations, after the first one finished, we quit;
479       job.setStatus(Job.Status.DISPATCHING);
480 
481       boolean triedDispatching = false;
482       boolean jobLoadExceedsMaximumLoads = false;
483 
484       final Float highestMaxLoad = services.stream()
485                                            .map(s -> ((ServiceRegistrationJpaImpl) s).getHostRegistration())
486                                            .map(HostRegistration::getMaxLoad)
487                                            .max(Comparator.naturalOrder())
488                                            .get();
489 
490       if (job.getJobLoad() > highestMaxLoad) {
491         // None of the available hosts is able to accept the job because the largest max load value is less than this
492         // job's load value
493         jobLoadExceedsMaximumLoads = true;
494       }
495 
496       for (ServiceRegistration registration : services) {
497         job.setProcessorServiceRegistration((ServiceRegistrationJpaImpl) registration);
498 
499         // Skip registration of host with less max load than highest available max load
500         // Note: This service registration may or may not live on a node which is set to accept jobs exceeding
501         // its max load
502         if (jobLoadExceedsMaximumLoads
503             && job.getProcessorServiceRegistration().getHostRegistration().getMaxLoad() != highestMaxLoad) {
504           continue;
505         }
506 
507         try {
508           job = serviceRegistry.updateInternal(job); // will open a tx
509         } catch (Exception e) {
510           // In theory, we should catch javax.persistence.OptimisticLockException. Unfortunately, eclipselink throws
511           // org.eclipse.persistence.exceptions.OptimisticLockException. In order to avoid importing the implementation
512           // specific APIs, we just catch Exception.
513           logger.debug("Unable to dispatch {}.  This is likely caused by another service registry dispatching the job",
514               job);
515           throw new UndispatchableJobException(job + " is already being dispatched");
516         }
517 
518         triedDispatching = true;
519 
520         String serviceUrl = UrlSupport.concat(registration.getHost(), registration.getPath(), "dispatch");
521         HttpPost post = new HttpPost(serviceUrl);
522 
523         // Add current organization and user so they can be used during execution at the remote end
524         post.addHeader(ORGANIZATION_HEADER, securityService.getOrganization().getId());
525         post.addHeader(USER_HEADER, securityService.getUser().getUsername());
526 
527         List<BasicNameValuePair> params = new ArrayList<>();
528         params.add(new BasicNameValuePair("id", Long.toString(job.getId())));
529         params.add(new BasicNameValuePair("operation", job.getOperation()));
530         post.setEntity(new UrlEncodedFormEntity(params, UTF_8));
531 
532         // Post the request
533         HttpResponse response = null;
534         int responseStatusCode;
535         try {
536           logger.debug("Trying to dispatch {} type '{}' load {} to {}", job, job.getJobType(), job.getJobLoad(),
537               registration.getHost());
538           if (!ServiceRegistryJpaImpl.START_WORKFLOW.equals(job.getOperation())) {
539             serviceRegistry.setCurrentJob(job.toJob());
540           }
541           response = client.execute(post);
542           responseStatusCode = response.getStatusLine().getStatusCode();
543           if (responseStatusCode == HttpStatus.SC_NO_CONTENT) {
544             return registration.getHost();
545           } else if (responseStatusCode == HttpStatus.SC_SERVICE_UNAVAILABLE) {
546             logger.debug("Service {} is currently refusing to accept jobs of type {}", registration,
547                 job.getOperation());
548             continue;
549           } else if (responseStatusCode == HttpStatus.SC_PRECONDITION_FAILED) {
550             job.setStatus(Job.Status.FAILED);
551             job = serviceRegistry.updateJob(job); // will open a tx
552             logger.debug("Service {} refused to accept {}", registration, job);
553             throw new UndispatchableJobException(IOUtils.toString(response.getEntity().getContent()));
554           } else if (responseStatusCode == HttpStatus.SC_METHOD_NOT_ALLOWED) {
555             logger.debug("Service {} is not yet reachable", registration);
556             continue;
557           } else {
558             logger.warn("Service {} failed ({}) accepting {}", registration, responseStatusCode, job);
559             continue;
560           }
561         } catch (UndispatchableJobException e) {
562           throw e;
563         } catch (TrustedHttpClientException e) {
564           // Will try another node. If no other node, it will be re-queued
565           logger.warn("Unable to dispatch {}", job, e);
566           continue;
567         } catch (Exception e) {
568           logger.warn("Unable to dispatch {}", job, e);
569         } finally {
570           try {
571             client.close(response);
572           } catch (IOException e) {
573             // ignore
574           }
575           serviceRegistry.setCurrentJob(null);
576         }
577       }
578 
579       // We've tried dispatching to every online service that can handle this type of job, with no luck.
580       if (triedDispatching) {
581         // Workflow type jobs are not set to priority list, because they handle accepting jobs not based on the job load
582         // If the system don't accepts jobs whose load exceeds the host's max load we can't make use of the priority
583         // list
584         if (serviceRegistry.acceptJobLoadsExeedingMaxLoad && !dispatchPriorityList.containsKey(job.getId())
585             && !ServiceRegistryJpaImpl.TYPE_WORKFLOW.equals(job.getJobType())
586             && job.getProcessorServiceRegistration() != null) {
587           String host = job.getProcessorServiceRegistration().getHost();
588           logger.debug("About to add {} to dispatchPriorityList with processor host {}", job, host);
589           dispatchPriorityList.put(job.getId(), host);
590         }
591 
592         try {
593           job.setStatus(Job.Status.QUEUED);
594           job.setProcessorServiceRegistration(null);
595           job = serviceRegistry.updateJob(job); // will open a tx
596         } catch (Exception e) {
597           logger.error("Unable to put {} back into queue", job, e);
598         }
599       }
600 
601       logger.debug("Unable to dispatch {}, no service is currently ready to accept the job", job);
602       throw new UndispatchableJobException(job + " is currently undispatchable");
603     }
604 
605     /**
606      * Return dispatchable job ids, where the job status is RESTART or QUEUED and the job id is listed in the given set.
607      *
608      * @param jobIds set with job id's interested in
609      * @return list with dispatchable job id's from the given set, with job status RESTART or QUEUED
610      */
611     protected Function<EntityManager, List<Long>> getDispatchableJobsWithIdFilterQuery(Set<Long> jobIds) {
612       return em -> {
613         if (jobIds == null || jobIds.isEmpty()) {
614           return Collections.emptyList();
615         }
616 
617         return namedQuery.findAll(
618             "Job.dispatchable.status.idfilter",
619             Long.class,
620             Pair.of("jobids", dispatchPriorityList.keySet()),
621             Pair.of("statuses", List.of(
622                 Job.Status.RESTART.ordinal(),
623                 Job.Status.QUEUED.ordinal()
624             ))
625         ).apply(em);
626       };
627     }
628 
629     private final Function<ServiceRegistration, HostRegistration> toHostRegistration =
630         new Function<ServiceRegistration, HostRegistration>() {
631       @Override
632       public HostRegistration apply(ServiceRegistration s) {
633         return ((ServiceRegistrationJpaImpl) s).getHostRegistration();
634       }
635     };
636 
637     private final Function<HostRegistration, Float> toMaxLoad = new Function<HostRegistration, Float>() {
638       @Override
639       public Float apply(HostRegistration h) {
640         return h.getMaxLoad();
641       }
642     };
643 
644     private final Comparator<Float> sortFloatValuesDesc = new Comparator<Float>() {
645       @Override
646       public int compare(Float o1, Float o2) {
647         return o2.compareTo(o1);
648       }
649     };
650   }
651 }