View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.serviceregistry.impl;
23  
24  import static java.lang.String.format;
25  import static org.apache.commons.lang3.StringUtils.isBlank;
26  import static org.opencastproject.db.Queries.namedQuery;
27  import static org.opencastproject.job.api.AbstractJobProducer.ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY;
28  import static org.opencastproject.job.api.AbstractJobProducer.DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING;
29  import static org.opencastproject.job.api.Job.FailureReason.DATA;
30  import static org.opencastproject.job.api.Job.Status.FAILED;
31  import static org.opencastproject.serviceregistry.api.ServiceState.ERROR;
32  import static org.opencastproject.serviceregistry.api.ServiceState.NORMAL;
33  import static org.opencastproject.serviceregistry.api.ServiceState.WARNING;
34  import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
35  
36  import org.opencastproject.db.DBSession;
37  import org.opencastproject.db.DBSessionFactory;
38  import org.opencastproject.job.api.Job;
39  import org.opencastproject.job.api.Job.Status;
40  import org.opencastproject.job.jpa.JpaJob;
41  import org.opencastproject.security.api.Organization;
42  import org.opencastproject.security.api.SecurityService;
43  import org.opencastproject.security.api.TrustedHttpClient;
44  import org.opencastproject.security.api.TrustedHttpClientException;
45  import org.opencastproject.security.api.User;
46  import org.opencastproject.serviceregistry.api.HostRegistration;
47  import org.opencastproject.serviceregistry.api.HostStatistics;
48  import org.opencastproject.serviceregistry.api.IncidentService;
49  import org.opencastproject.serviceregistry.api.Incidents;
50  import org.opencastproject.serviceregistry.api.JaxbServiceStatistics;
51  import org.opencastproject.serviceregistry.api.ServiceRegistration;
52  import org.opencastproject.serviceregistry.api.ServiceRegistry;
53  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
54  import org.opencastproject.serviceregistry.api.ServiceStatistics;
55  import org.opencastproject.serviceregistry.api.SystemLoad;
56  import org.opencastproject.serviceregistry.api.SystemLoad.NodeLoad;
57  import org.opencastproject.serviceregistry.impl.jpa.HostRegistrationJpaImpl;
58  import org.opencastproject.serviceregistry.impl.jpa.ServiceRegistrationJpaImpl;
59  import org.opencastproject.systems.OpencastConstants;
60  import org.opencastproject.util.NotFoundException;
61  import org.opencastproject.util.UrlSupport;
62  import org.opencastproject.util.function.ThrowingConsumer;
63  
64  import org.apache.commons.lang3.StringUtils;
65  import org.apache.commons.lang3.time.DateUtils;
66  import org.apache.commons.lang3.tuple.Pair;
67  import org.apache.http.HttpResponse;
68  import org.apache.http.HttpStatus;
69  import org.apache.http.client.methods.HttpHead;
70  import org.osgi.service.cm.ConfigurationException;
71  import org.osgi.service.cm.ManagedService;
72  import org.osgi.service.component.ComponentContext;
73  import org.osgi.service.component.annotations.Activate;
74  import org.osgi.service.component.annotations.Component;
75  import org.osgi.service.component.annotations.Deactivate;
76  import org.osgi.service.component.annotations.Modified;
77  import org.osgi.service.component.annotations.Reference;
78  import org.osgi.service.component.annotations.ReferenceCardinality;
79  import org.osgi.service.component.annotations.ReferencePolicy;
80  import org.slf4j.Logger;
81  import org.slf4j.LoggerFactory;
82  
83  import java.net.InetAddress;
84  import java.net.URI;
85  import java.net.URISyntaxException;
86  import java.util.ArrayList;
87  import java.util.Arrays;
88  import java.util.Collections;
89  import java.util.Comparator;
90  import java.util.Date;
91  import java.util.Dictionary;
92  import java.util.HashMap;
93  import java.util.List;
94  import java.util.Map;
95  import java.util.Objects;
96  import java.util.Optional;
97  import java.util.concurrent.Executors;
98  import java.util.concurrent.ScheduledFuture;
99  import java.util.concurrent.ScheduledThreadPoolExecutor;
100 import java.util.concurrent.TimeUnit;
101 import java.util.concurrent.atomic.AtomicReference;
102 import java.util.function.Consumer;
103 import java.util.function.Function;
104 import java.util.stream.Collectors;
105 
106 import javax.persistence.EntityManager;
107 import javax.persistence.EntityManagerFactory;
108 import javax.persistence.LockModeType;
109 import javax.persistence.NoResultException;
110 import javax.persistence.TypedQuery;
111 
112 /** JPA implementation of the {@link ServiceRegistry} */
113 @Component(
114   property = {
115     "service.description=Service registry"
116   },
117   immediate = true,
118   service = { ManagedService.class, ServiceRegistry.class, ServiceRegistryJpaImpl.class }
119 )
120 public class ServiceRegistryJpaImpl implements ServiceRegistry, ManagedService {
121 
122   /** JPA persistence unit name */
123   public static final String PERSISTENCE_UNIT = "org.opencastproject.common";
124 
125   /** Id of the workflow's start operation operation, need to match the corresponding enum value in
126    * WorkflowServiceImpl */
127   public static final String START_OPERATION = "START_OPERATION";
128 
129   /** Id of the workflow's start workflow operation, need to match the corresponding enum value in
130    * WorkflowServiceImpl */
131   public static final String START_WORKFLOW = "START_WORKFLOW";
132 
133   /** Id of the workflow's resume operation, need to match the corresponding enum value in WorkflowServiceImpl */
134   public static final String RESUME = "RESUME";
135 
136   /** Identifier for the workflow service */
137   public static final String TYPE_WORKFLOW = "org.opencastproject.workflow";
138 
139   static final Logger logger = LoggerFactory.getLogger(ServiceRegistryJpaImpl.class);
140 
141   /** Current job used to process job in the service registry */
142   private static final ThreadLocal<Job> currentJob = new ThreadLocal<>();
143 
144   /** Configuration key for the maximum load */
145   protected static final String OPT_MAXLOAD = "org.opencastproject.server.maxload";
146 
147   /** Configuration key for the collection of job statistics */
148   protected static final String OPT_JOBSTATISTICS = "jobstats.collect";
149 
150   /** Configuration key for the retrieval of service statistics:
151    * Do not consider jobs older than max_job_age (in days) */
152   protected static final String OPT_SERVICE_STATISTICS_MAX_JOB_AGE =
153       "org.opencastproject.statistics.services.max_job_age";
154 
155   /** Configuration key for the encoding preferred worker nodes */
156   protected static final String OPT_ENCODING_WORKERS = "org.opencastproject.encoding.workers";
157 
158   /** Configuration key for the encoding workers load threshold */
159   protected static final String OPT_ENCODING_THRESHOLD = "org.opencastproject.encoding.workers.threshold";
160 
161   /** The http client to use when connecting to remote servers */
162   protected TrustedHttpClient client = null;
163 
164   /** Default jobs limit during dispatching
165    * (larger value will fetch more entries from the database at the same time and increase RAM usage) */
166   static final int DEFAULT_DISPATCH_JOBS_LIMIT = 100;
167 
168   /** Default setting on job statistics collection */
169   static final boolean DEFAULT_JOB_STATISTICS = false;
170 
171   /** Default setting on service statistics retrieval */
172   static final int DEFAULT_SERVICE_STATISTICS_MAX_JOB_AGE = 14;
173 
174   static final List<String>  DEFAULT_ENCODING_WORKERS = new ArrayList<String>();
175 
176   static final double DEFAULT_ENCODING_THRESHOLD = 0.0;
177 
178   /** The configuration key for setting {@link #maxAttemptsBeforeErrorState} */
179   static final String MAX_ATTEMPTS_CONFIG_KEY = "max.attempts";
180 
181   /** The configuration key for setting {@link #noErrorStateServiceTypes} */
182   static final String NO_ERROR_STATE_SERVICE_TYPES_CONFIG_KEY = "no.error.state.service.types";
183 
184   /** Default value for {@link #maxAttemptsBeforeErrorState} */
185   private static final int DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE = -1;
186 
187   /** Default value for {@link #errorStatesEnabled} */
188   private static final boolean DEFAULT_ERROR_STATES_ENABLED = true;
189 
190   /** Number of failed jobs on a service before to set it in error state. -1 will disable error states completely. */
191   protected int maxAttemptsBeforeErrorState = DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE;
192   private boolean errorStatesEnabled = DEFAULT_ERROR_STATES_ENABLED;
193 
194   /** Services for which error state is disabled */
195   private List<String> noErrorStateServiceTypes = new ArrayList<>();
196 
197   /** Default job load when not passed by service creating the job * */
198   static final float DEFAULT_JOB_LOAD = 0.1f;
199 
200   /** This host's base URL */
201   protected String hostName;
202 
203   /** This host's descriptive node name eg admin, worker01 */
204   protected String nodeName;
205 
206   /** The base URL for job URLs */
207   protected String jobHost;
208 
209   /** Comma-seperate list with URLs of encoding specialised workers*/
210   protected static List<String> encodingWorkers = DEFAULT_ENCODING_WORKERS;
211 
212   /** Threshold value under which defined workers get preferred when dispatching encoding jobs */
213   protected static double encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
214 
215   /** The factory used to generate the entity manager */
216   protected EntityManagerFactory emf = null;
217 
218   protected DBSessionFactory dbSessionFactory;
219 
220   protected DBSession db;
221 
222   /** The thread pool to use for dispatching queued jobs and checking on phantom services. */
223   protected ScheduledThreadPoolExecutor scheduledExecutor = null;
224   private ScheduledFuture hbfuture = null;
225 
226   /** The security service */
227   protected SecurityService securityService = null;
228 
229   protected IncidentService incidentService = null;
230 
231   protected Incidents incidents;
232 
233   /** Whether to collect detailed job statistics */
234   protected boolean collectJobstats = DEFAULT_JOB_STATISTICS;
235 
236   /** Maximum age of jobs being considering for service statistics */
237   protected int maxJobAge = DEFAULT_SERVICE_STATISTICS_MAX_JOB_AGE;
238 
239   /** A static list of statuses that influence how load balancing is calculated */
240   protected static final List<Status> JOB_STATUSES_INFLUENCING_LOAD_BALANCING;
241 
242   private static final Status[] activeJobStatus =
243       Arrays.stream(Status.values()).filter(Status::isActive).collect(Collectors.toList()).toArray(new Status[0]);
244 
245   protected static HashMap<Long, Float> jobCache = new HashMap<>();
246 
247   static {
248     JOB_STATUSES_INFLUENCING_LOAD_BALANCING = new ArrayList<>();
249     JOB_STATUSES_INFLUENCING_LOAD_BALANCING.add(Status.RUNNING);
250   }
251 
252   /** Whether to accept a job whose load exceeds the host’s max load */
253   protected Boolean acceptJobLoadsExeedingMaxLoad = true;
254 
255   // Current system load
256   protected float localSystemLoad = 0.0f;
257 
258   /** OSGi DI */
259   @Reference(target = "(osgi.unit.name=org.opencastproject.common)")
260   void setEntityManagerFactory(EntityManagerFactory emf) {
261     this.emf = emf;
262   }
263 
264   @Reference
265   public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
266     this.dbSessionFactory = dbSessionFactory;
267   }
268 
269   @Activate
270   public void activate(ComponentContext cc) {
271     logger.info("Activate service registry");
272 
273     db = dbSessionFactory.createSession(emf);
274 
275     // Find this host's url
276     if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty(OpencastConstants.SERVER_URL_PROPERTY))) {
277       hostName = UrlSupport.DEFAULT_BASE_URL;
278     } else {
279       hostName = cc.getBundleContext().getProperty(OpencastConstants.SERVER_URL_PROPERTY);
280     }
281 
282     // Check hostname for sanity. It should be the hosts URL with protocol but without any part of the service paths.
283     if (hostName.endsWith("/")) {
284       logger.warn("The configured value of {} ends with '/'. This is very likely a configuration error which could "
285               + "lead to services not working properly. Note that this configuration should not contain any part of "
286               + "the service paths.", OpencastConstants.SERVER_URL_PROPERTY);
287     }
288 
289     // Clean all undispatchable jobs that were orphaned when this host was last deactivated
290     cleanUndispatchableJobs(hostName);
291 
292     // Find the jobs URL
293     if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty("org.opencastproject.jobs.url"))) {
294       jobHost = hostName;
295     } else {
296       jobHost = cc.getBundleContext().getProperty("org.opencastproject.jobs.url");
297     }
298 
299     // Register this host
300     try {
301       if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty(OpencastConstants.NODE_NAME_PROPERTY))) {
302         nodeName = null;
303       } else {
304         nodeName = cc.getBundleContext().getProperty(OpencastConstants.NODE_NAME_PROPERTY);
305       }
306 
307       float maxLoad = Runtime.getRuntime().availableProcessors();
308       if (cc != null && StringUtils.isNotBlank(cc.getBundleContext().getProperty(OPT_MAXLOAD))) {
309         try {
310           maxLoad = Float.parseFloat(cc.getBundleContext().getProperty(OPT_MAXLOAD));
311           logger.info("Max load has been set manually to {}", maxLoad);
312         } catch (NumberFormatException e) {
313           logger.warn("Configuration key '{}' is not an integer. Falling back to the number of cores ({})",
314                   OPT_MAXLOAD, maxLoad);
315         }
316       }
317 
318       logger.info("Node maximum load set to {}", maxLoad);
319 
320       String address = InetAddress.getByName(URI.create(hostName).getHost()).getHostAddress();
321       long maxMemory = Runtime.getRuntime().maxMemory();
322       int cores = Runtime.getRuntime().availableProcessors();
323 
324       registerHost(hostName, address, nodeName, maxMemory, cores, maxLoad);
325     } catch (Exception e) {
326       throw new IllegalStateException("Unable to register host " + hostName + " in the service registry", e);
327     }
328 
329     // Whether a service accepts a job whose load exceeds the host’s max load
330     if (cc != null) {
331       acceptJobLoadsExeedingMaxLoad = getOptContextProperty(cc, ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY)
332               .map(Boolean::valueOf)
333               .orElse(DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING);
334     }
335 
336     localSystemLoad = 0;
337     logger.info("Activated");
338   }
339 
340   @Override
341   public float getOwnLoad() {
342     return localSystemLoad;
343   }
344 
345   @Override
346   public String getRegistryHostname() {
347     return hostName;
348   }
349 
350   @Deactivate
351   public void deactivate() {
352     logger.info("deactivate service registry");
353 
354     // Wait for job dispatcher to stop before unregistering hosts and requeuing jobs
355     if (scheduledExecutor != null) {
356       try {
357         scheduledExecutor.shutdownNow();
358         if (!scheduledExecutor.isShutdown()) {
359           logger.info("Waiting for Dispatcher to terminate");
360           scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS);
361         }
362       } catch (InterruptedException e) {
363         logger.error("Error shutting down the Dispatcher", e);
364       }
365     }
366 
367     try {
368       unregisterHost(hostName);
369     } catch (ServiceRegistryException e) {
370       throw new IllegalStateException("Unable to unregister host " + hostName + " from the service registry", e);
371     }
372   }
373 
374   /**
375    * {@inheritDoc}
376    *
377    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String)
378    */
379   @Override
380   public Job createJob(String type, String operation) throws ServiceRegistryException {
381     return createJob(this.hostName, type, operation, null, null, true, getCurrentJob(), DEFAULT_JOB_LOAD);
382   }
383 
384   /**
385    * {@inheritDoc}
386    *
387    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
388    *      java.util.List)
389    */
390   @Override
391   public Job createJob(String type, String operation, List<String> arguments) throws ServiceRegistryException {
392     return createJob(this.hostName, type, operation, arguments, null, true, getCurrentJob(), DEFAULT_JOB_LOAD);
393   }
394 
395   /**
396    * {@inheritDoc}
397    *
398    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
399    *      java.util.List, Float)
400    */
401   @Override
402   public Job createJob(String type, String operation, List<String> arguments, Float jobLoad)
403           throws ServiceRegistryException {
404     return createJob(this.hostName, type, operation, arguments, null, true, getCurrentJob(), jobLoad);
405   }
406 
407   /**
408    * {@inheritDoc}
409    *
410    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
411    *      java.util.List, String, boolean)
412    */
413   @Override
414   public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable)
415           throws ServiceRegistryException {
416     return createJob(this.hostName, type, operation, arguments, payload, dispatchable, getCurrentJob(),
417             DEFAULT_JOB_LOAD);
418   }
419 
420   /**
421    * {@inheritDoc}
422    *
423    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
424    *      java.util.List, java.lang.String, boolean, Float)
425    */
426   @Override
427   public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
428           Float jobLoad) throws ServiceRegistryException {
429     return createJob(this.hostName, type, operation, arguments, payload, dispatchable, getCurrentJob(), jobLoad);
430   }
431 
432   public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
433           Job parentJob) throws ServiceRegistryException {
434     return createJob(this.hostName, type, operation, arguments, payload, dispatchable, parentJob, DEFAULT_JOB_LOAD);
435   }
436 
437   /**
438    * {@inheritDoc}
439    *
440    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
441    *      java.util.List, java.lang.String, boolean, org.opencastproject.job.api.Job, Float)
442    */
443   @Override
444   public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
445           Job parentJob, Float jobLoad) throws ServiceRegistryException {
446     return createJob(this.hostName, type, operation, arguments, payload, dispatchable, parentJob, jobLoad);
447   }
448 
449   /**
450    * Creates a job on a remote host with a jobLoad of 1.0.
451    */
452   public Job createJob(String host, String serviceType, String operation, List<String> arguments, String payload,
453           boolean dispatchable, Job parentJob) throws ServiceRegistryException {
454     return createJob(host, serviceType, operation, arguments, payload, dispatchable, parentJob, DEFAULT_JOB_LOAD);
455   }
456 
457   /**
458    * Creates a job on a remote host.
459    */
460   public Job createJob(String host, String serviceType, String operation, List<String> arguments, String payload,
461           boolean dispatchable, Job parentJob, float jobLoad) throws ServiceRegistryException {
462     if (StringUtils.isBlank(host)) {
463       throw new IllegalArgumentException("Host can't be null");
464     }
465     if (StringUtils.isBlank(serviceType)) {
466       throw new IllegalArgumentException("Service type can't be null");
467     }
468     if (StringUtils.isBlank(operation)) {
469       throw new IllegalArgumentException("Operation can't be null");
470     }
471 
472     JpaJob jpaJob = db.execTxChecked(em -> {
473       ServiceRegistrationJpaImpl creatingService = getServiceRegistrationQuery(serviceType, host).apply(em)
474           .orElseThrow(() -> new ServiceRegistryException("No service registration exists for type '" + serviceType
475               + "' on host '" + host + "'"));
476 
477       if (creatingService.getHostRegistration().isMaintenanceMode()) {
478         logger.warn("Creating a job from {}, which is currently in maintenance mode.", creatingService.getHost());
479       } else if (!creatingService.getHostRegistration().isActive()) {
480         logger.warn("Creating a job from {}, which is currently inactive.", creatingService.getHost());
481       }
482 
483       User currentUser = securityService.getUser();
484       Organization currentOrganization = securityService.getOrganization();
485 
486       JpaJob job = new JpaJob(currentUser, currentOrganization, creatingService, operation, arguments, payload,
487               dispatchable, jobLoad);
488 
489       // Bind the given parent job to the new job
490       if (parentJob != null) {
491         // Get the JPA instance of the parent job
492         JpaJob jpaParentJob = getJpaJobQuery(parentJob.getId()).apply(em).orElseThrow(() -> {
493           logger.error("job with id {} not found in the persistence context", parentJob);
494           // We don't want to leave the deleted job in the cache if there
495           removeFromLoadCache(parentJob.getId());
496           return new ServiceRegistryException(new NotFoundException());
497         });
498         job.setParentJob(jpaParentJob);
499 
500         // Get the JPA instance of the root job
501         JpaJob jpaRootJob = jpaParentJob;
502         if (parentJob.getRootJobId() != null) {
503           jpaRootJob = getJpaJobQuery(parentJob.getRootJobId()).apply(em).orElseThrow(() -> {
504             logger.error("job with id {} not found in the persistence context", parentJob.getRootJobId());
505             // We don't want to leave the deleted job in the cache if there
506             removeFromLoadCache(parentJob.getRootJobId());
507             return new ServiceRegistryException(new NotFoundException());
508           });
509         }
510         job.setRootJob(jpaRootJob);
511       }
512 
513       // if this job is not dispatchable, it must be handled by the host that has created it
514       if (dispatchable) {
515         logger.trace("Queuing dispatchable '{}'", job);
516         job.setStatus(Status.QUEUED);
517       } else {
518         logger.trace("Giving new non-dispatchable '{}' its creating service as processor '{}'", job, creatingService);
519         job.setProcessorServiceRegistration(creatingService);
520       }
521 
522       em.persist(job);
523       return job;
524     });
525 
526     setJobUri(jpaJob);
527     return jpaJob.toJob();
528   }
529 
530   @Override
531   public void removeJobs(List<Long> jobIds) throws NotFoundException, ServiceRegistryException {
532     for (long jobId: jobIds) {
533       if (jobId < 1) {
534         throw new NotFoundException("Job ID must be greater than zero (0)");
535       }
536     }
537 
538     logger.debug("Start deleting jobs with IDs '{}'", jobIds);
539     try {
540       db.execTxChecked(em -> {
541         for (long jobId : jobIds) {
542           JpaJob job = em.find(JpaJob.class, jobId);
543           if (job == null) {
544             logger.error("Job with Id {} cannot be deleted: Not found.", jobId);
545             removeFromLoadCache(jobId);
546             throw new NotFoundException("Job with ID '" + jobId + "' not found");
547           }
548           deleteChildJobsQuery(jobId).accept(em);
549           em.remove(job);
550           removeFromLoadCache(jobId);
551         }
552       });
553     } catch (NotFoundException | ServiceRegistryException e) {
554       throw e;
555     } catch (Exception e) {
556       throw new ServiceRegistryException(e);
557     }
558 
559     logger.info("Jobs with IDs '{}' deleted", jobIds);
560   }
561 
562   private ThrowingConsumer<EntityManager, Exception> deleteChildJobsQuery(long jobId) {
563     return em -> {
564       List<Job> childJobs = getChildJobs(jobId);
565       if (childJobs.isEmpty()) {
566         logger.trace("No child jobs of job '{}' found to delete.", jobId);
567         return;
568       }
569 
570       logger.debug("Start deleting child jobs of job '{}'", jobId);
571 
572       try {
573         for (int i = childJobs.size() - 1; i >= 0; i--) {
574           Job job = childJobs.get(i);
575           JpaJob jobToDelete = em.find(JpaJob.class, job.getId());
576           em.remove(jobToDelete);
577           removeFromLoadCache(job.getId());
578           logger.debug("{} deleted", job);
579         }
580         logger.debug("Deleted all child jobs of job '{}'", jobId);
581       } catch (Exception e) {
582         throw new ServiceRegistryException("Unable to remove child jobs from " + jobId, e);
583       }
584     };
585   }
586 
587   @Override
588   public void removeParentlessJobs(int lifetime) throws ServiceRegistryException {
589     int count = db.execTxChecked(em -> {
590       int c = 0;
591 
592       List<Job> jobs = namedQuery.findAll("Job.withoutParent", JpaJob.class).apply(em).stream()
593           .map(JpaJob::toJob)
594           .filter(j -> j.getDateCreated().before(DateUtils.addDays(new Date(), -lifetime)))
595           // DO NOT DELETE workflow instances and operations!
596           .filter(j -> !START_OPERATION.equals(j.getOperation())
597               && !START_WORKFLOW.equals(j.getOperation())
598               && !RESUME.equals(j.getOperation()))
599           .filter(j -> j.getStatus().isTerminated())
600           .collect(Collectors.toList());
601 
602       for (Job job : jobs) {
603         try {
604           removeJobs(Collections.singletonList(job.getId()));
605           logger.debug("Parentless '{}' removed", job);
606           c++;
607         } catch (NotFoundException e) {
608           logger.debug("Parentless '{} ' not found in database", job, e);
609         }
610       }
611 
612       return c;
613     });
614 
615 
616     if (count > 0) {
617       logger.info("Successfully removed {} parentless jobs", count);
618     } else {
619       logger.trace("No parentless jobs found to remove");
620     }
621   }
622 
623   /**
624    * {@inheritDoc}
625    *
626    * @see org.osgi.service.cm.ManagedService#updated(java.util.Dictionary)
627    */
628   @Override
629   public void updated(Dictionary properties) throws ConfigurationException {
630     logger.info("Updating service registry properties");
631 
632     maxAttemptsBeforeErrorState = DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE;
633     errorStatesEnabled = DEFAULT_ERROR_STATES_ENABLED;
634     String maxAttempts = StringUtils.trimToNull((String) properties.get(MAX_ATTEMPTS_CONFIG_KEY));
635     if (maxAttempts != null) {
636       try {
637         maxAttemptsBeforeErrorState = Integer.parseInt(maxAttempts);
638         if (maxAttemptsBeforeErrorState < 0) {
639           errorStatesEnabled = false;
640           logger.info("Error states of services disabled");
641         } else {
642           errorStatesEnabled = true;
643           logger.info("Set max attempts before error state to {}", maxAttempts);
644         }
645       } catch (NumberFormatException e) {
646         logger.warn("Can not set max attempts before error state to {}. {} must be an integer", maxAttempts,
647                 MAX_ATTEMPTS_CONFIG_KEY);
648       }
649     }
650 
651     noErrorStateServiceTypes = new ArrayList<>();
652     String noErrorStateServiceTypesStr = StringUtils.trimToNull((String) properties.get(
653             NO_ERROR_STATE_SERVICE_TYPES_CONFIG_KEY));
654     if (noErrorStateServiceTypesStr != null) {
655       noErrorStateServiceTypes = Arrays.asList(noErrorStateServiceTypesStr.split("\\s*,\\s*"));
656       if (!noErrorStateServiceTypes.isEmpty()) {
657         logger.info("Set service types without error state to {}", String.join(", ", noErrorStateServiceTypes));
658       }
659     }
660 
661     String jobStatsString = StringUtils.trimToNull((String) properties.get(OPT_JOBSTATISTICS));
662     if (StringUtils.isNotBlank(jobStatsString)) {
663       try {
664         collectJobstats = Boolean.parseBoolean(jobStatsString);
665       } catch (Exception e) {
666         logger.warn("Job statistics collection flag '{}' is malformed, setting to {}", jobStatsString,
667                 DEFAULT_JOB_STATISTICS);
668         collectJobstats = DEFAULT_JOB_STATISTICS;
669       }
670     }
671 
672     // get the encoding worker nodes defined in the configuration file and parse the comma-separated list
673     String encodingWorkersString = (String) properties.get(OPT_ENCODING_WORKERS);
674     if (StringUtils.isNotBlank(encodingWorkersString)) {
675       encodingWorkers = Arrays.asList(encodingWorkersString.split("\\s*,\\s*"));
676     } else {
677       encodingWorkers = DEFAULT_ENCODING_WORKERS;
678     }
679 
680     // get the encoding worker load threshold defined in the configuration file and parse the double
681     String encodingThersholdString = StringUtils.trimToNull((String) properties.get(OPT_ENCODING_THRESHOLD));
682     if (StringUtils.isNotBlank(encodingThersholdString) && encodingThersholdString != null) {
683         try {
684           double encodingThresholdTmp = Double.parseDouble(encodingThersholdString);
685           if (encodingThresholdTmp >= 0 && encodingThresholdTmp <= 1) {
686             encodingThreshold = encodingThresholdTmp;
687           } else {
688             encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
689             logger.warn("org.opencastproject.encoding.workers.threshold is not between 0 and 1");
690           }
691         } catch (NumberFormatException e) {
692           logger.warn("Can not set encoding threshold to {}. {} must be an parsable double", encodingThersholdString,
693               OPT_ENCODING_THRESHOLD);
694         }
695     } else {
696       encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
697     }
698 
699     String maxJobAgeString = StringUtils.trimToNull((String) properties.get(OPT_SERVICE_STATISTICS_MAX_JOB_AGE));
700     if (maxJobAgeString != null) {
701       try {
702         maxJobAge = Integer.parseInt(maxJobAgeString);
703         logger.info("Set service statistics max job age to {}", maxJobAgeString);
704       } catch (NumberFormatException e) {
705         logger.warn("Can not set service statistics max job age to {}. {} must be an integer", maxJobAgeString,
706                 OPT_SERVICE_STATISTICS_MAX_JOB_AGE);
707       }
708     }
709   }
710 
711   private void setupScheduledExecutor() {
712     if (scheduledExecutor == null) {
713       scheduledExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
714       scheduledExecutor.setRemoveOnCancelPolicy(true);
715     }
716   }
717 
718   protected void startHeartbeat(long heartbeatInterval) {
719     setupScheduledExecutor();
720 
721     // Schedule the service heartbeat if the interval is > 0
722     if (heartbeatInterval > 0) {
723       // Stop the current dispatch thread so we can configure a new one
724       if (hbfuture != null) {
725         hbfuture.cancel(true);
726       }
727 
728       logger.debug("Starting service heartbeat at a custom interval of {}s", heartbeatInterval);
729       hbfuture = scheduledExecutor.scheduleWithFixedDelay(new JobProducerHeartbeat(), heartbeatInterval,
730           heartbeatInterval, TimeUnit.SECONDS);
731     }
732   }
733 
734   /**
735    * OSGI callback when the configuration is updated. This method is only here to prevent the
736    * configuration admin service from calling the service deactivate and activate methods
737    * for a config update. It does not have to do anything as the updates are handled by updated().
738    */
739   @Modified
740   public void modified(Map<String, Object> config) throws ConfigurationException {
741     logger.debug("Modified serviceregistry");
742   }
743 
744   private Function<EntityManager, Optional<JpaJob>> getJpaJobQuery(long id) {
745     return em -> namedQuery.findByIdOpt(JpaJob.class, id)
746         .apply(em)
747         .map(jpaJob -> {
748           // JPA's caches can be out of date if external changes (e.g. another node in the cluster) have been made to
749           // this row in the database
750           em.refresh(jpaJob);
751           setJobUri(jpaJob);
752           return jpaJob;
753         });
754   }
755 
756   @Override
757   public Job getJob(long id) throws NotFoundException, ServiceRegistryException {
758     try {
759       return db.exec(getJpaJobQuery(id))
760           .map(JpaJob::toJob)
761           .orElseThrow(NotFoundException::new);
762     } catch (NotFoundException e) {
763       throw e;
764     } catch (Exception e) {
765       throw new ServiceRegistryException(e);
766     }
767   }
768 
769   /**
770    * {@inheritDoc}
771    *
772    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getCurrentJob()
773    */
774   @Override
775   public Job getCurrentJob() {
776     return currentJob.get();
777   }
778 
779   /**
780    * {@inheritDoc}
781    *
782    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#setCurrentJob(Job)
783    */
784   @Override
785   public void setCurrentJob(Job job) {
786     currentJob.set(job);
787   }
788 
789   JpaJob updateJob(JpaJob job) throws ServiceRegistryException {
790     try {
791       // tx context is opened in
792       //   updateInternal
793       //   updateServiceForFailover
794       return db.execChecked(em -> {
795         Job oldJob = getJob(job.getId());
796         JpaJob jpaJob = updateInternal(job);
797         if (!TYPE_WORKFLOW.equals(job.getJobType()) && job.getJobLoad() > 0.0f
798             && job.getProcessorServiceRegistration() != null
799             && job.getProcessorServiceRegistration().getHost().equals(getRegistryHostname())) {
800           processCachedLoadChange(job);
801         }
802 
803         // All WorkflowService Jobs will be ignored
804         if (oldJob.getStatus() != job.getStatus() && !TYPE_WORKFLOW.equals(job.getJobType())) {
805           updateServiceForFailover(job);
806         }
807 
808         return jpaJob;
809       });
810     } catch (ServiceRegistryException e) {
811       throw e;
812     } catch (NotFoundException e) {
813       // Just in case, remove from cache if there
814       removeFromLoadCache(job.getId());
815       throw new ServiceRegistryException(e);
816     } catch (Exception e) {
817       throw new ServiceRegistryException(e);
818     }
819   }
820 
821   @Override
822   public Job updateJob(Job job) throws ServiceRegistryException {
823     JpaJob jpaJob = JpaJob.from(job);
824     jpaJob.setProcessorServiceRegistration(
825             (ServiceRegistrationJpaImpl) getServiceRegistration(job.getJobType(), job.getProcessingHost()));
826     return updateJob(jpaJob).toJob();
827   }
828 
829   /**
830    * Processes the job load changes for the *local* load cache
831    *
832    * @param job
833    *   The job to apply to the load cache
834    */
835   private synchronized void processCachedLoadChange(JpaJob job) {
836     if (JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(job.getStatus()) && jobCache.get(job.getId()) == null) {
837       logger.debug("Adding to load cache: {}, type {}, load {}, status {}",
838               job, job.getJobType(), job.getJobLoad(), job.getStatus());
839       localSystemLoad += job.getJobLoad();
840       jobCache.put(job.getId(), job.getJobLoad());
841     } else if (jobCache.get(job.getId()) != null && Status.FINISHED.equals(job.getStatus())
842             || Status.FAILED.equals(job.getStatus()) || Status.WAITING.equals(job.getStatus())) {
843       logger.debug("Removing from load cache: {}, type {}, load {}, status {}",
844               job, job.getJobType(), job.getJobLoad(), job.getStatus());
845       localSystemLoad -= job.getJobLoad();
846       jobCache.remove(job.getId());
847     } else {
848       logger.debug("Ignoring for load cache: {}, type {}, status {}",
849               job, job.getJobType(), job.getStatus());
850     }
851     logger.debug("Current host load: {}, job load cache size: {}", format("%.1f", localSystemLoad), jobCache.size());
852 
853     if (jobCache.isEmpty()) {
854       if (Math.abs(localSystemLoad) > 0.0000001F) {
855         logger.warn("No jobs in the job load cache, but load is {}: setting job load to 0", localSystemLoad);
856       }
857       localSystemLoad = 0.0F;
858     }
859   }
860 
861   private synchronized void removeFromLoadCache(Long jobId) {
862     if (jobCache.get(jobId) != null) {
863       float jobLoad = jobCache.get(jobId);
864       logger.debug("Removing deleted job from load cache: Job {}, load {}", jobId, jobLoad);
865       localSystemLoad -= jobLoad;
866       jobCache.remove(jobId);
867     }
868   }
869 
870   protected JpaJob setJobUri(JpaJob job) {
871     try {
872       job.setUri(new URI(jobHost + "/services/job/" + job.getId() + ".xml"));
873     } catch (URISyntaxException e) {
874       logger.warn("Can not set the job URI", e);
875     }
876     return job;
877   }
878 
879   /**
880    * Internal method to update a job, throwing unwrapped JPA exceptions.
881    *
882    * @param job
883    *          the job to update
884    * @return the updated job
885    */
886   protected JpaJob updateInternal(JpaJob job) throws NotFoundException {
887     JpaJob fromDb = db.execTxChecked(em -> {
888       JpaJob j = em.find(JpaJob.class, job.getId());
889       if (j == null) {
890         throw new NotFoundException();
891       }
892 
893       update(j, job);
894       em.merge(j);
895       return j;
896     });
897 
898     job.setVersion(fromDb.toJob().getVersion());
899     setJobUri(job);
900     return job;
901   }
902 
903   /**
904    * Internal method to update the service registration state, throwing unwrapped JPA exceptions.
905    *
906    * @param registration
907    *          the service registration to update
908    * @return the updated service registration
909    */
910   private ServiceRegistration updateServiceState(ServiceRegistrationJpaImpl registration) throws NotFoundException {
911     db.execTxChecked(em -> {
912       ServiceRegistrationJpaImpl fromDb = em.find(ServiceRegistrationJpaImpl.class, registration.getId());
913       if (fromDb == null) {
914         throw new NotFoundException();
915       }
916       fromDb.setServiceState(registration.getServiceState());
917       fromDb.setStateChanged(registration.getStateChanged());
918       fromDb.setWarningStateTrigger(registration.getWarningStateTrigger());
919       fromDb.setErrorStateTrigger(registration.getErrorStateTrigger());
920     });
921 
922     return registration;
923   }
924 
925   /**
926    * Sets the queue and runtimes and other elements of a persistent job based on a job that's been modified in memory.
927    * Times on both the objects must be modified, since the in-memory job must not be stale.
928    *
929    * @param fromDb
930    *          The job from the database
931    * @param jpaJob
932    *          The in-memory job
933    */
934   private void update(JpaJob fromDb, JpaJob jpaJob) {
935     final Job job = jpaJob.toJob();
936     final Date now = new Date();
937     final Status status = job.getStatus();
938     final Status fromDbStatus = fromDb.getStatus();
939 
940     fromDb.setPayload(job.getPayload());
941     fromDb.setStatus(job.getStatus());
942     fromDb.setDispatchable(job.isDispatchable());
943     fromDb.setVersion(job.getVersion());
944     fromDb.setOperation(job.getOperation());
945     fromDb.setArguments(job.getArguments());
946 
947     if (job.getDateCreated() == null) {
948       jpaJob.setDateCreated(now);
949       fromDb.setDateCreated(now);
950       job.setDateCreated(now);
951     }
952     if (job.getProcessingHost() != null) {
953       ServiceRegistrationJpaImpl processingService = (ServiceRegistrationJpaImpl) getServiceRegistration(
954               job.getJobType(), job.getProcessingHost());
955       logger.debug("{} has host '{}': setting processor service to '{}'", job, job.getProcessingHost(),
956           processingService);
957       fromDb.setProcessorServiceRegistration(processingService);
958     } else {
959       logger.debug("Unsetting previous processor service registration for {}", job);
960       fromDb.setProcessorServiceRegistration(null);
961     }
962     if (Status.RUNNING.equals(status) && !Status.WAITING.equals(fromDbStatus)) {
963       if (job.getDateStarted() == null) {
964         jpaJob.setDateStarted(now);
965         jpaJob.setQueueTime(now.getTime() - job.getDateCreated().getTime());
966         fromDb.setDateStarted(now);
967         fromDb.setQueueTime(now.getTime() - job.getDateCreated().getTime());
968         job.setDateStarted(now);
969         job.setQueueTime(now.getTime() - job.getDateCreated().getTime());
970       }
971     } else if (Status.FAILED.equals(status)) {
972       // failed jobs may not have even started properly
973       if (job.getDateCompleted() == null) {
974         fromDb.setDateCompleted(now);
975         jpaJob.setDateCompleted(now);
976         job.setDateCompleted(now);
977         if (job.getDateStarted() != null) {
978           jpaJob.setRunTime(now.getTime() - job.getDateStarted().getTime());
979           fromDb.setRunTime(now.getTime() - job.getDateStarted().getTime());
980           job.setRunTime(now.getTime() - job.getDateStarted().getTime());
981         }
982       }
983     } else if (Status.FINISHED.equals(status)) {
984       if (job.getDateStarted() == null) {
985         // Some services (e.g. ingest) don't use job dispatching, since they start immediately and handle their own
986         // lifecycle. In these cases, if the start date isn't set, use the date created as the start date
987         jpaJob.setDateStarted(job.getDateCreated());
988         job.setDateStarted(job.getDateCreated());
989       }
990       if (job.getDateCompleted() == null) {
991         jpaJob.setDateCompleted(now);
992         jpaJob.setRunTime(now.getTime() - job.getDateStarted().getTime());
993         fromDb.setDateCompleted(now);
994         fromDb.setRunTime(now.getTime() - job.getDateStarted().getTime());
995         job.setDateCompleted(now);
996         job.setRunTime(now.getTime() - job.getDateStarted().getTime());
997       }
998     }
999   }
1000 
1001   /**
1002    * Fetches a host registration from persistence.
1003    *
1004    * @param host
1005    *          the host name
1006    * @return the host registration, or null if none exists
1007    */
1008   protected Function<EntityManager, Optional<HostRegistrationJpaImpl>> fetchHostRegistrationQuery(String host) {
1009     return namedQuery.findOpt(
1010         "HostRegistration.byHostName",
1011         HostRegistrationJpaImpl.class,
1012         Pair.of("host", host)
1013     );
1014   }
1015 
1016   /**
1017    * {@inheritDoc}
1018    *
1019    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#registerHost(String, String, String, long, int, float)
1020    */
1021   @Override
1022   public void registerHost(String host, String address, String nodeName, long memory, int cores, float maxLoad)
1023           throws ServiceRegistryException {
1024     try {
1025       HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1026         // Find the existing registrations for this host and if it exists, update it
1027         Optional<HostRegistrationJpaImpl> hostRegistrationOpt = fetchHostRegistrationQuery(host).apply(em);
1028         HostRegistrationJpaImpl hr;
1029 
1030         if (hostRegistrationOpt.isEmpty()) {
1031           hr = new HostRegistrationJpaImpl(host, address, nodeName, memory, cores, maxLoad, true, false);
1032           em.persist(hr);
1033         } else {
1034           hr = hostRegistrationOpt.get();
1035           hr.setIpAddress(address);
1036           hr.setNodeName(nodeName);
1037           hr.setMemory(memory);
1038           hr.setCores(cores);
1039           hr.setMaxLoad(maxLoad);
1040           hr.setOnline(true);
1041           em.merge(hr);
1042         }
1043         logger.info("Registering {} with a maximum load of {}", host, maxLoad);
1044         return hr;
1045       });
1046     } catch (Exception e) {
1047       throw new ServiceRegistryException(e);
1048     }
1049   }
1050 
1051   /**
1052    * {@inheritDoc}
1053    *
1054    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#unregisterHost(java.lang.String)
1055    */
1056   @Override
1057   public void unregisterHost(String host) throws ServiceRegistryException {
1058     try {
1059       HostRegistrationJpaImpl existingHostRegistration = db.execTxChecked(em -> {
1060         HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1061             () -> new IllegalArgumentException("Host '" + host + "' is not registered, so it can not be unregistered"));
1062 
1063         hr.setOnline(false);
1064         for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1065           unRegisterService(serviceRegistration.getServiceType(), serviceRegistration.getHost());
1066         }
1067         em.merge(hr);
1068 
1069         logger.info("Unregistering {}", host);
1070         return hr;
1071       });
1072 
1073       logger.info("Host {} unregistered", host);
1074     } catch (Exception e) {
1075       throw new ServiceRegistryException(e);
1076     }
1077   }
1078 
1079   /**
1080    * {@inheritDoc}
1081    *
1082    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#enableHost(String)
1083    */
1084   @Override
1085   public void enableHost(String host) throws ServiceRegistryException, NotFoundException {
1086     try {
1087       HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1088         // Find the existing registrations for this host and if it exists, update it
1089         HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1090             () -> new NotFoundException("Host '" + host + "' is currently not registered, so it can not be enabled"));
1091         hr.setActive(true);
1092         em.merge(hr);
1093         logger.info("Enabling {}", host);
1094         return hr;
1095       });
1096 
1097       db.execTxChecked(em -> {
1098         for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1099           ((ServiceRegistrationJpaImpl) serviceRegistration).setActive(true);
1100           em.merge(serviceRegistration);
1101         }
1102       });
1103     } catch (NotFoundException e) {
1104       throw e;
1105     } catch (Exception e) {
1106       throw new ServiceRegistryException(e);
1107     }
1108   }
1109 
1110   /**
1111    * {@inheritDoc}
1112    *
1113    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#disableHost(String)
1114    */
1115   @Override
1116   public void disableHost(String host) throws ServiceRegistryException, NotFoundException {
1117     try {
1118       HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1119         HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1120             () -> new NotFoundException("Host '" + host + "' is not currently registered, so it can not be disabled"));
1121 
1122         hr.setActive(false);
1123         for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1124           ((ServiceRegistrationJpaImpl) serviceRegistration).setActive(false);
1125           em.merge(serviceRegistration);
1126         }
1127         em.merge(hr);
1128 
1129         logger.info("Disabling {}", host);
1130         return hr;
1131       });
1132     } catch (NotFoundException e) {
1133       throw e;
1134     } catch (Exception e) {
1135       throw new ServiceRegistryException(e);
1136     }
1137   }
1138 
1139   /**
1140    * {@inheritDoc}
1141    *
1142    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#registerService(java.lang.String, java.lang.String,
1143    *      java.lang.String)
1144    */
1145   @Override
1146   public ServiceRegistration registerService(String serviceType, String baseUrl, String path)
1147           throws ServiceRegistryException {
1148     return registerService(serviceType, baseUrl, path, false);
1149   }
1150 
1151   /**
1152    * {@inheritDoc}
1153    *
1154    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#registerService(java.lang.String, java.lang.String,
1155    *      java.lang.String, boolean)
1156    */
1157   @Override
1158   public ServiceRegistration registerService(String serviceType, String baseUrl, String path, boolean jobProducer)
1159           throws ServiceRegistryException {
1160     cleanRunningJobs(serviceType, baseUrl);
1161     return setOnlineStatus(serviceType, baseUrl, path, true, jobProducer);
1162   }
1163 
1164   protected Function<EntityManager, Optional<ServiceRegistrationJpaImpl>> getServiceRegistrationQuery(
1165       String serviceType, String host) {
1166     return namedQuery.findOpt(
1167         "ServiceRegistration.getRegistration",
1168         ServiceRegistrationJpaImpl.class,
1169         Pair.of("serviceType", serviceType),
1170         Pair.of("host", host)
1171     );
1172   }
1173 
1174   /**
1175    * Sets the online status of a service registration.
1176    *
1177    * @param serviceType
1178    *          The job type
1179    * @param baseUrl
1180    *          the host URL
1181    * @param online
1182    *          whether the service is online or off
1183    * @param jobProducer
1184    *          whether this service produces jobs for long running operations
1185    * @return the service registration
1186    */
1187   protected ServiceRegistration setOnlineStatus(String serviceType, String baseUrl, String path, boolean online,
1188           Boolean jobProducer) throws ServiceRegistryException {
1189     if (isBlank(serviceType) || isBlank(baseUrl)) {
1190       logger.info("Uninformed baseUrl '{}' or service '{}' (path '{}')", baseUrl, serviceType, path);
1191       throw new IllegalArgumentException("serviceType and baseUrl must not be blank");
1192     }
1193 
1194     try {
1195       AtomicReference<HostRegistrationJpaImpl> hostRegistration = new AtomicReference<>();
1196       AtomicReference<ServiceRegistrationJpaImpl> registration = new AtomicReference<>();
1197 
1198       db.execTxChecked(em -> {
1199         HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(baseUrl).apply(em).orElseThrow(() -> {
1200           logger.info("No associated host registration for '{}' or service '{}' (path '{}')", baseUrl, serviceType,
1201               path);
1202           return new IllegalStateException(
1203               "A service registration can not be updated when it has no associated host registration");
1204         });
1205         hostRegistration.set(hr);
1206 
1207         ServiceRegistrationJpaImpl sr;
1208         Optional<ServiceRegistrationJpaImpl> srOpt = getServiceRegistrationQuery(serviceType, baseUrl).apply(em);
1209         if (srOpt.isEmpty()) {
1210           if (isBlank(path)) {
1211             // we can not create a new registration without a path
1212             throw new IllegalArgumentException("path must not be blank when registering new services");
1213           }
1214 
1215           // if we are not provided a value, consider it to be false
1216           sr = new ServiceRegistrationJpaImpl(hr, serviceType, path, Objects.requireNonNullElse(jobProducer, false));
1217           em.persist(sr);
1218         } else {
1219           sr = srOpt.get();
1220           if (StringUtils.isNotBlank(path)) {
1221             sr.setPath(path);
1222           }
1223           sr.setOnline(online);
1224           if (jobProducer != null) { // if we are not provided a value, don't update the persistent value
1225             sr.setJobProducer(jobProducer);
1226           }
1227           em.merge(sr);
1228         }
1229         registration.set(sr);
1230       });
1231       return registration.get();
1232     } catch (Exception e) {
1233       throw new ServiceRegistryException(e);
1234     }
1235   }
1236 
1237   /**
1238    * {@inheritDoc}
1239    *
1240    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#unRegisterService(java.lang.String, java.lang.String)
1241    */
1242   @Override
1243   public void unRegisterService(String serviceType, String baseUrl) throws ServiceRegistryException {
1244     logger.info("Unregistering Service {}@{} and cleaning its running jobs", serviceType, baseUrl);
1245     // TODO: create methods that accept an entity manager, so we can execute multiple queries using the same em and tx
1246     //       (em and tx are reused if using nested db.execTx)
1247     setOnlineStatus(serviceType, baseUrl, null, false, null);
1248     cleanRunningJobs(serviceType, baseUrl);
1249   }
1250 
1251   /**
1252    * Find all undispatchable jobs that were orphaned when this host was last deactivated and set them to CANCELLED.
1253    */
1254   private void cleanUndispatchableJobs(String hostName) {
1255     logger.debug("Starting check for undispatchable jobs for host {}", hostName);
1256 
1257     try {
1258       db.execTxChecked(em -> {
1259         List<JpaJob> undispatchableJobs = namedQuery.findAll(
1260             "Job.undispatchable.status",
1261             JpaJob.class,
1262             Pair.of("statuses", List.of(
1263                 Status.INSTANTIATED.ordinal(),
1264                 Status.RUNNING.ordinal()
1265             ))
1266         ).apply(em);
1267 
1268         if (undispatchableJobs.size() > 0) {
1269           logger.info("Found {} undispatchable jobs on host {}", undispatchableJobs.size(), hostName);
1270         }
1271 
1272         for (JpaJob job : undispatchableJobs) {
1273           // Make sure the job was processed on this host
1274           String jobHost = "";
1275           if (job.getProcessorServiceRegistration() != null) {
1276             jobHost = job.getProcessorServiceRegistration().getHost();
1277           }
1278 
1279           if (!jobHost.equals(hostName)) {
1280             logger.debug("Will not cancel undispatchable job {} for host {}, it is running on a different host ({})",
1281                 job, hostName, jobHost);
1282             continue;
1283           }
1284 
1285           logger.info("Cancelling the running undispatchable job {}, it was orphaned on this host ({})", job, hostName);
1286           job.setStatus(Status.CANCELLED);
1287           em.merge(job);
1288         }
1289       });
1290     } catch (Exception e) {
1291       logger.error("Unable to clean undispatchable jobs for host {}! {}", hostName, e.getMessage());
1292     }
1293   }
1294 
1295   /**
1296    * Find all running jobs on this service and set them to RESET or CANCELLED.
1297    *
1298    * @param serviceType
1299    *          the service type
1300    * @param baseUrl
1301    *          the base url
1302    * @throws ServiceRegistryException
1303    *           if there is a problem communicating with the jobs database
1304    */
1305   private void cleanRunningJobs(String serviceType, String baseUrl) throws ServiceRegistryException {
1306     try {
1307       db.execTxChecked(em -> {
1308         TypedQuery<JpaJob> query = em.createNamedQuery("Job.processinghost.status", JpaJob.class)
1309             .setLockMode(LockModeType.PESSIMISTIC_WRITE)
1310             .setParameter("statuses", List.of(
1311                 Status.RUNNING.ordinal(),
1312                 Status.DISPATCHING.ordinal(),
1313                 Status.WAITING.ordinal()
1314             ))
1315             .setParameter("host", baseUrl)
1316             .setParameter("serviceType", serviceType);
1317 
1318         List<JpaJob> unregisteredJobs = query.getResultList();
1319         if (unregisteredJobs.size() > 0) {
1320           logger.info("Found {} jobs to clean for {}@{}", unregisteredJobs.size(), serviceType, baseUrl);
1321         }
1322 
1323         for (JpaJob job : unregisteredJobs) {
1324           if (job.isDispatchable()) {
1325             em.refresh(job);
1326             // If this job has already been treated
1327             if (Status.CANCELLED.equals(job.getStatus()) || Status.RESTART.equals(job.getStatus())) {
1328               continue;
1329             }
1330 
1331             if (job.getRootJob() != null && Status.PAUSED.equals(job.getRootJob().getStatus())) {
1332               JpaJob rootJob = job.getRootJob();
1333               cancelAllChildrenQuery(rootJob).accept(em);
1334               rootJob.setStatus(Status.RESTART);
1335               rootJob.setOperation(START_OPERATION);
1336               em.merge(rootJob);
1337               continue;
1338             }
1339 
1340             logger.info("Marking child jobs from {} as canceled", job);
1341             cancelAllChildrenQuery(job).accept(em);
1342 
1343             logger.info("Rescheduling lost {}", job);
1344             job.setStatus(Status.RESTART);
1345             job.setProcessorServiceRegistration(null);
1346           } else {
1347             logger.info("Marking lost {} as failed", job);
1348             job.setStatus(Status.FAILED);
1349           }
1350 
1351           em.merge(job);
1352         }
1353       });
1354     } catch (Exception e) {
1355       throw new ServiceRegistryException(e);
1356     }
1357   }
1358 
1359   /**
1360    * Go through all the children recursively to set them in {@link Status#CANCELLED} status
1361    *
1362    * @param job
1363    *          the parent job
1364    */
1365   private Consumer<EntityManager> cancelAllChildrenQuery(JpaJob job) {
1366     return em -> job.getChildJobs().stream()
1367         .peek(em::refresh)
1368         .filter(child -> Status.CANCELLED.equals(child.getStatus()))
1369         .forEach(child -> {
1370           cancelAllChildrenQuery(child).accept(em);
1371           child.setStatus(Status.CANCELLED);
1372           em.merge(child);
1373         });
1374   }
1375 
1376   /**
1377    * {@inheritDoc}
1378    *
1379    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#setMaintenanceStatus(java.lang.String, boolean)
1380    */
1381   @Override
1382   public void setMaintenanceStatus(String baseUrl, boolean maintenance) throws NotFoundException {
1383     logger.info("Setting maintenance mode on host '{}'", baseUrl);
1384     HostRegistrationJpaImpl reg = db.execTxChecked(em -> {
1385       HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(baseUrl).apply(em).orElseThrow(() -> {
1386             logger.warn("Can not set maintenance mode because host '{}' was not registered", baseUrl);
1387         return new NotFoundException("Can not set maintenance mode on a host that has not been registered");
1388       });
1389       hr.setMaintenanceMode(maintenance);
1390       em.merge(hr);
1391       return hr;
1392     });
1393 
1394     logger.info("Finished setting maintenance mode on host '{}'", baseUrl);
1395   }
1396 
1397   /**
1398    * {@inheritDoc}
1399    *
1400    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrations()
1401    */
1402   @Override
1403   public List<ServiceRegistration> getServiceRegistrations() {
1404     return db.exec(getServiceRegistrationsQuery());
1405   }
1406 
1407   @Override
1408   public Incidents incident() {
1409     return incidents;
1410   }
1411 
1412   private List<ServiceRegistration> getOnlineServiceRegistrations() {
1413     return db.exec(namedQuery.findAll("ServiceRegistration.getAllOnline", ServiceRegistration.class));
1414   }
1415 
1416   /**
1417    * Gets all service registrations.
1418    *
1419    * @return the list of service registrations
1420    */
1421   protected Function<EntityManager, List<ServiceRegistration>> getServiceRegistrationsQuery() {
1422     return namedQuery.findAll("ServiceRegistration.getAll", ServiceRegistration.class);
1423   }
1424 
1425   /**
1426    * Gets all host registrations
1427    *
1428    * @return the list of host registrations
1429    */
1430   @Override
1431   public List<HostRegistration> getHostRegistrations() {
1432     return db.exec(getHostRegistrationsQuery());
1433   }
1434 
1435   @Override
1436   public HostStatistics getHostStatistics() {
1437     HostStatistics statistics = new HostStatistics();
1438 
1439     db.exec(namedQuery.findAll(
1440         "HostRegistration.jobStatistics",
1441         Object[].class,
1442         Pair.of("status", List.of(Status.QUEUED.ordinal(), Status.RUNNING.ordinal()))
1443     )).forEach(row -> {
1444       final long host = ((Number) row[0]).longValue();
1445       final int status = ((Number) row[1]).intValue();
1446       final long count = ((Number) row[2]).longValue();
1447 
1448       if (status == Status.RUNNING.ordinal()) {
1449         statistics.addRunning(host, count);
1450       } else {
1451         statistics.addQueued(host, count);
1452       }
1453     });
1454 
1455     return statistics;
1456   }
1457 
1458   /**
1459    * Gets all host registrations
1460    *
1461    * @return the list of host registrations
1462    */
1463   protected Function<EntityManager, List<HostRegistration>> getHostRegistrationsQuery() {
1464     return namedQuery.findAll("HostRegistration.getAll", HostRegistration.class);
1465   }
1466 
1467   @Override
1468   public HostRegistration getHostRegistration(String hostname) throws ServiceRegistryException {
1469     return db.exec(getHostRegistrationQuery(hostname));
1470   }
1471 
1472   protected Function<EntityManager, HostRegistration> getHostRegistrationQuery(String hostname) {
1473     return namedQuery.find(
1474         "HostRegistration.byHostName",
1475         HostRegistration.class,
1476         Pair.of("host", hostname)
1477     );
1478   }
1479 
1480   /**
1481    * {@inheritDoc}
1482    *
1483    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getChildJobs(long)
1484    */
1485   @Override
1486   public List<Job> getChildJobs(long id) throws ServiceRegistryException {
1487     try {
1488       List<JpaJob> jobs = db.exec(namedQuery.findAll(
1489           "Job.root.children",
1490           JpaJob.class,
1491           Pair.of("id", id)
1492       ));
1493 
1494       if (jobs.size() == 0) {
1495         jobs = db.exec(getChildrenQuery(id));
1496       }
1497 
1498       return jobs.stream()
1499           .map(this::setJobUri)
1500           .map(JpaJob::toJob)
1501           .collect(Collectors.toList());
1502     } catch (Exception e) {
1503       throw new ServiceRegistryException(e);
1504     }
1505   }
1506 
1507   private Function<EntityManager, List<JpaJob>> getChildrenQuery(long id) {
1508     return em -> {
1509       TypedQuery<JpaJob> query = em
1510           .createNamedQuery("Job.children", JpaJob.class)
1511           .setParameter("id", id);
1512 
1513       List<JpaJob> childJobs = query.getResultList();
1514 
1515       List<JpaJob> result = new ArrayList<>(childJobs);
1516       childJobs.stream()
1517           .map(j -> getChildrenQuery(j.getId()).apply(em))
1518           .forEach(result::addAll);
1519 
1520       return result;
1521     };
1522   }
1523 
1524   /**
1525    * {@inheritDoc}
1526    *
1527    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getJobs(java.lang.String, Status)
1528    */
1529   @Override
1530   public List<Job> getJobs(String type, Status status) throws ServiceRegistryException {
1531     logger.trace("Getting jobs '{}' and '{}'", type, status);
1532 
1533     Function<EntityManager, List<JpaJob>> jobsQuery;
1534     if (type == null && status == null) {
1535       jobsQuery = namedQuery.findAll("Job.all", JpaJob.class);
1536     } else if (type == null) {
1537       jobsQuery = namedQuery.findAll(
1538           "Job.status",
1539           JpaJob.class,
1540           Pair.of("status", status.ordinal())
1541       );
1542     } else if (status == null) {
1543       jobsQuery = namedQuery.findAll(
1544           "Job.type",
1545           JpaJob.class,
1546           Pair.of("serviceType", type)
1547       );
1548     } else {
1549       jobsQuery = namedQuery.findAll(
1550           "Job",
1551           JpaJob.class,
1552           Pair.of("status", status.ordinal()),
1553           Pair.of("serviceType", type)
1554       );
1555     }
1556 
1557     try {
1558       return db.exec(jobsQuery).stream()
1559           .peek(this::setJobUri)
1560           .map(JpaJob::toJob)
1561           .collect(Collectors.toList());
1562     } catch (Exception e) {
1563       throw new ServiceRegistryException(e);
1564     }
1565   }
1566 
1567   @Override
1568   public List<String> getJobPayloads(String operation) throws ServiceRegistryException {
1569     try {
1570       return db.exec(namedQuery.findAll(
1571           "Job.payload",
1572           String.class,
1573           Pair.of("operation", operation)
1574       ));
1575     } catch (Exception e) {
1576       throw new ServiceRegistryException(e);
1577     }
1578   }
1579 
1580   @Override
1581   public List<String> getJobPayloads(String operation, int limit, int offset) throws ServiceRegistryException {
1582     try {
1583       return db.exec(em -> {
1584         return em.createNamedQuery("Job.payload", String.class)
1585             .setParameter("operation", operation)
1586             .setMaxResults(limit)
1587             .setFirstResult(offset)
1588             .getResultList();
1589       });
1590     } catch (Exception e) {
1591       throw new ServiceRegistryException(e);
1592     }
1593   }
1594 
1595   @Override
1596   public int getJobCount(final String operation) throws ServiceRegistryException {
1597     try {
1598       return db.exec(namedQuery.find(
1599           "Job.countByOperationOnly",
1600           Number.class,
1601           Pair.of("operation", operation)
1602       )).intValue();
1603     } catch (Exception e) {
1604       throw new ServiceRegistryException(e);
1605     }
1606   }
1607 
1608   /**
1609    * {@inheritDoc}
1610    *
1611    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getActiveJobs()
1612    */
1613   @Override
1614   public List<Job> getActiveJobs() throws ServiceRegistryException {
1615     try {
1616       return db.exec(getJobsByStatusQuery(activeJobStatus)).stream()
1617           .map(JpaJob::toJob)
1618           .collect(Collectors.toList());
1619     } catch (Exception e) {
1620       throw new ServiceRegistryException(e);
1621     }
1622   }
1623 
1624   /**
1625    * Get the list of jobs with status from the given statuses.
1626    *
1627    * @param statuses
1628    *          variable sized array of status values to test on jobs
1629    * @return list of jobs with status from statuses
1630    */
1631   public Function<EntityManager, List<JpaJob>> getJobsByStatusQuery(Status... statuses) {
1632     if (statuses == null || statuses.length < 1) {
1633       throw new IllegalArgumentException("At least one job status must be given.");
1634     }
1635 
1636     return namedQuery.findAll(
1637         "Job.statuses",
1638         JpaJob.class,
1639         Pair.of("statuses", Arrays.stream(statuses).map(Enum::ordinal).collect(Collectors.toList()))
1640     ).andThen(jobs -> jobs.stream()
1641         .peek(this::setJobUri)
1642         .collect(Collectors.toList()));
1643   }
1644 
1645   @Override
1646   public Map<String, Map<String, Long>> countActiveByOrganizationAndHost() {
1647     var rows = db.exec(namedQuery.findAll(
1648         "Job.countByOrganizationAndHost",
1649         Object[].class,
1650         Pair.of("statuses", Arrays.stream(activeJobStatus).map(Enum::ordinal).collect(Collectors.toList()))
1651     )).stream().collect(Collectors.toList());
1652     var orgMap = new HashMap<String, Map<String, Long>>();
1653     for (Object[] row: rows) {
1654       var org = (String) row[0];
1655       var host = (String) row[1];
1656       var count = (Long) row[2];
1657       orgMap.computeIfAbsent(org, o -> new HashMap<>()).put(host, count);
1658     }
1659     return orgMap;
1660   }
1661 
1662   @Override
1663   public Map<String, Long> countActiveTypeByOrganization(final String operation) {
1664     return db.exec(namedQuery.findAll(
1665         "Job.countTypeByOrganization",
1666         Object[].class,
1667         Pair.of("statuses", Arrays.stream(activeJobStatus).map(Enum::ordinal).collect(Collectors.toList())),
1668         Pair.of("operation", operation)
1669     )).stream().collect(Collectors.toMap(
1670         row -> (String) row[0],
1671         row -> (Long) row[1]
1672     ));
1673   }
1674 
1675   /**
1676    * Gets jobs of all types that are in the given state.
1677    *
1678    * @param offset apply offset to the db query if offset &gt; 0
1679    * @param limit apply limit to the db query if limit &gt; 0
1680    * @param statuses the job status should be one from the given statuses
1681    * @return the list of jobs waiting for dispatch
1682    */
1683   protected Function<EntityManager, List<JpaJob>> getDispatchableJobsWithStatusQuery(int offset, int limit,
1684       Status... statuses) {
1685     return em -> {
1686       if (statuses == null) {
1687         return Collections.emptyList();
1688       }
1689 
1690       TypedQuery<JpaJob> query = em
1691           .createNamedQuery("Job.dispatchable.status", JpaJob.class)
1692           .setParameter("statuses", Arrays.stream(statuses).map(Enum::ordinal).collect(Collectors.toList()));
1693       if (offset > 0) {
1694         query.setFirstResult(offset);
1695       }
1696       if (limit > 0) {
1697         query.setMaxResults(limit);
1698       }
1699       return query.getResultList();
1700     };
1701   }
1702 
1703   Function<EntityManager, List<Object[]>> getAvgOperationsQuery() {
1704     return namedQuery.findAll("Job.avgOperation", Object[].class);
1705   }
1706 
1707   Function<EntityManager, List<Object[]>> getCountPerHostServiceQuery() {
1708     return namedQuery.findAll("Job.countPerHostService", Object[].class);
1709   }
1710 
1711   /**
1712    * {@inheritDoc}
1713    *
1714    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#count(java.lang.String, Status)
1715    */
1716   @Override
1717   public long count(String serviceType, Status status) throws ServiceRegistryException {
1718     Function<EntityManager, Number> countQuery;
1719     if (serviceType == null && status == null) {
1720       countQuery = namedQuery.find(
1721           "Job.count.all",
1722           Number.class
1723       );
1724     } else if (serviceType == null) {
1725       countQuery = namedQuery.find(
1726           "Job.count.nullType",
1727           Number.class,
1728           Pair.of("status", status.ordinal())
1729       );
1730     } else if (status == null) {
1731       countQuery = namedQuery.find(
1732           "Job.count.nullStatus",
1733           Number.class,
1734           Pair.of("serviceType", serviceType)
1735       );
1736     } else {
1737       countQuery = namedQuery.find(
1738           "Job.count",
1739           Number.class,
1740           Pair.of("status", status.ordinal()),
1741           Pair.of("serviceType", serviceType)
1742       );
1743     }
1744 
1745     try {
1746       return db.exec(countQuery).longValue();
1747     } catch (Exception e) {
1748       throw new ServiceRegistryException(e);
1749     }
1750   }
1751 
1752   /**
1753    * {@inheritDoc}
1754    *
1755    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#countByHost(java.lang.String, java.lang.String,
1756    *      Status)
1757    */
1758   @Override
1759   public long countByHost(String serviceType, String host, Status status) throws ServiceRegistryException {
1760     Function<EntityManager, Number> countQuery;
1761     if (serviceType != null && !serviceType.isEmpty()) {
1762       countQuery = namedQuery.find(
1763           "Job.countByHost",
1764           Number.class,
1765           Pair.of("serviceType", serviceType),
1766           Pair.of("status", status.ordinal()),
1767           Pair.of("host", host)
1768       );
1769     } else {
1770       countQuery = namedQuery.find(
1771           "Job.countByHost.nullType",
1772           Number.class,
1773           Pair.of("status", status.ordinal()),
1774           Pair.of("host", host)
1775       );
1776     }
1777 
1778     try {
1779       return db.exec(countQuery).longValue();
1780     } catch (Exception e) {
1781       throw new ServiceRegistryException(e);
1782     }
1783   }
1784 
1785   /**
1786    * {@inheritDoc}
1787    *
1788    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#countByOperation(java.lang.String, java.lang.String,
1789    *      Status)
1790    */
1791 
1792   @Override
1793   public long countByOperation(String serviceType, String operation, Status status) throws ServiceRegistryException {
1794     try {
1795       return db.exec(namedQuery.find(
1796           "Job.countByOperation",
1797           Number.class,
1798           Pair.of("status", status.ordinal()),
1799           Pair.of("serviceType", serviceType),
1800           Pair.of("operation", operation)
1801       )).longValue();
1802     } catch (Exception e) {
1803       throw new ServiceRegistryException(e);
1804     }
1805   }
1806 
1807   /**
1808    * {@inheritDoc}
1809    *
1810    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#count(java.lang.String, java.lang.String,
1811    *      java.lang.String, Status)
1812    */
1813   @Override
1814   public long count(String serviceType, String host, String operation, Status status) throws ServiceRegistryException {
1815     if (StringUtils.isBlank(serviceType) || StringUtils.isBlank(host) || StringUtils.isBlank(operation)
1816             || status == null) {
1817       throw new IllegalArgumentException("service type, host, operation, and status must be provided");
1818     }
1819 
1820     try {
1821       return db.exec(namedQuery.find(
1822           "Job.fullMonty",
1823           Number.class,
1824           Pair.of("status", status.ordinal()),
1825           Pair.of("serviceType", serviceType),
1826           Pair.of("operation", operation)
1827       )).longValue();
1828     } catch (Exception e) {
1829       throw new ServiceRegistryException(e);
1830     }
1831   }
1832 
1833   /**
1834    * {@inheritDoc}
1835    *
1836    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceStatistics()
1837    */
1838   @Override
1839   public List<ServiceStatistics> getServiceStatistics() throws ServiceRegistryException {
1840     Date now = new Date();
1841     try {
1842       return db.exec(getServiceStatisticsQuery(
1843           DateUtils.addDays(now, -maxJobAge),
1844           DateUtils.addDays(now, 1) // Avoid glitches around 'now' by setting the endDate to 'tomorrow'
1845       ));
1846     } catch (Exception e) {
1847       throw new ServiceRegistryException(e);
1848     }
1849   }
1850 
1851   /**
1852    * Gets performance and runtime statistics for each known service registration.
1853    * For the statistics, only jobs created within the time interval [startDate, endDate] are being considered
1854    *
1855    * @param startDate
1856    *          Only jobs created after this data are considered for statistics
1857    * @param endDate
1858    *          Only jobs created before this data are considered for statistics
1859    * @return the service statistics
1860    */
1861   private Function<EntityManager, List<ServiceStatistics>> getServiceStatisticsQuery(Date startDate, Date endDate) {
1862     return em -> {
1863       Map<Long, JaxbServiceStatistics> statsMap = new HashMap<>();
1864 
1865       // Make sure we also include the services that have no processing history so far
1866       namedQuery.findAll("ServiceRegistration.getAll", ServiceRegistrationJpaImpl.class).apply(em).forEach(s ->
1867         statsMap.put(s.getId(), new JaxbServiceStatistics(s))
1868       );
1869 
1870       if (collectJobstats) {
1871         // Build stats map
1872         namedQuery.findAll(
1873             "ServiceRegistration.statistics",
1874             Object[].class,
1875             Pair.of("minDateCreated", startDate),
1876             Pair.of("maxDateCreated", endDate)
1877         ).apply(em).forEach(row -> {
1878           Number serviceRegistrationId = (Number) row[0];
1879           if (serviceRegistrationId == null || serviceRegistrationId.longValue() == 0) {
1880             return;
1881           }
1882           Status status = Status.values()[((Number) row[1]).intValue()];
1883           Number count = (Number) row[2];
1884           Number meanQueueTime = (Number) row[3];
1885           Number meanRunTime = (Number) row[4];
1886 
1887           // The statistics query returns a cartesian product, so we need to iterate over them to build up the objects
1888           JaxbServiceStatistics stats = statsMap.get(serviceRegistrationId.longValue());
1889           if (stats == null) {
1890             return;
1891           }
1892 
1893           // the status will be null if there are no jobs at all associated with this service registration
1894           if (status != null) {
1895             switch (status) {
1896               case RUNNING:
1897                 stats.setRunningJobs(count.intValue());
1898                 break;
1899               case QUEUED:
1900               case DISPATCHING:
1901                 stats.setQueuedJobs(count.intValue());
1902                 break;
1903               case FINISHED:
1904                 stats.setMeanRunTime(meanRunTime.longValue());
1905                 stats.setMeanQueueTime(meanQueueTime.longValue());
1906                 stats.setFinishedJobs(count.intValue());
1907                 break;
1908               default:
1909                 break;
1910             }
1911           }
1912         });
1913       }
1914 
1915       List<ServiceStatistics> stats = new ArrayList<>(statsMap.values());
1916       stats.sort((o1, o2) -> {
1917         ServiceRegistration reg1 = o1.getServiceRegistration();
1918         ServiceRegistration reg2 = o2.getServiceRegistration();
1919         int typeComparison = reg1.getServiceType().compareTo(reg2.getServiceType());
1920         return typeComparison == 0
1921             ? reg1.getHost().compareTo(reg2.getHost())
1922             : typeComparison;
1923       });
1924 
1925       return stats;
1926     };
1927   }
1928 
1929   /**
1930    * Do not look at this, it will burn your eyes! This is due to JPA's inability to do a left outer join with join
1931    * conditions.
1932    *
1933    * {@inheritDoc}
1934    *
1935    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrationsByLoad(java.lang.String)
1936    */
1937   @Override
1938   public List<ServiceRegistration> getServiceRegistrationsByLoad(String serviceType) throws ServiceRegistryException {
1939     SystemLoad loadByHost = getCurrentHostLoads();
1940     List<HostRegistration> hostRegistrations = getHostRegistrations();
1941     List<ServiceRegistration> serviceRegistrations = getServiceRegistrationsByType(serviceType);
1942     return getServiceRegistrationsByLoad(serviceType, serviceRegistrations, hostRegistrations, loadByHost);
1943   }
1944 
1945   /**
1946    * {@inheritDoc}
1947    *
1948    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getCurrentHostLoads()
1949    */
1950   @Override
1951   public SystemLoad getCurrentHostLoads() {
1952     return db.exec(getHostLoadsQuery());
1953   }
1954 
1955   /**
1956    * Gets a map of hosts to the number of jobs currently loading that host
1957    *
1958    * @return the map of hosts to job counts
1959    */
1960   Function<EntityManager, SystemLoad> getHostLoadsQuery() {
1961     return em -> {
1962       final SystemLoad systemLoad = new SystemLoad();
1963 
1964       // Find all jobs that are currently running on any given host, or get all of them
1965       List<Integer> statuses = JOB_STATUSES_INFLUENCING_LOAD_BALANCING.stream()
1966           .map(Enum::ordinal)
1967           .collect(Collectors.toList());
1968       List<Object[]> rows = namedQuery.findAll(
1969           "ServiceRegistration.hostloads",
1970           Object[].class,
1971           Pair.of("statuses", statuses),
1972           // Note: This is used in the query to filter out workflow jobs.
1973           // These jobs are load balanced by the workflow service directly.
1974           Pair.of("workflow_type", TYPE_WORKFLOW)
1975       ).apply(em);
1976 
1977       // Accumulate the numbers for relevant job statuses per host
1978       for (Object[] row : rows) {
1979         String host = String.valueOf(row[0]);
1980         Status status = Status.values()[(int) row[1]];
1981         float currentLoad = ((Number) row[2]).floatValue();
1982         float maxLoad = ((Number) row[3]).floatValue();
1983 
1984         // Only queued, and running jobs are adding to the load, so every other status is discarded
1985         if (status == null || !JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(status)) {
1986           currentLoad = 0.0f;
1987         }
1988         // Add the service registration
1989         NodeLoad serviceLoad = new NodeLoad(host, currentLoad, maxLoad);
1990         systemLoad.addNodeLoad(serviceLoad);
1991       }
1992 
1993       // This is important, otherwise services which have no current load are not listed in the output!
1994       getHostRegistrationsQuery().apply(em).stream()
1995           .filter(h -> !systemLoad.containsHost(h.getBaseUrl()))
1996           .forEach(h -> systemLoad.addNodeLoad(new NodeLoad(h.getBaseUrl(), 0.0f, h.getMaxLoad())));
1997       return systemLoad;
1998     };
1999   }
2000 
2001   /**
2002    * {@inheritDoc}
2003    *
2004    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrationsByType(java.lang.String)
2005    */
2006   @Override
2007   public List<ServiceRegistration> getServiceRegistrationsByType(String serviceType) throws ServiceRegistryException {
2008     return db.exec(namedQuery.findAll(
2009         "ServiceRegistration.getByType",
2010         ServiceRegistration.class,
2011         Pair.of("serviceType", serviceType)
2012     ));
2013   }
2014 
2015   /**
2016    * {@inheritDoc}
2017    *
2018    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrationsByHost(java.lang.String)
2019    */
2020   @Override
2021   public List<ServiceRegistration> getServiceRegistrationsByHost(String host) throws ServiceRegistryException {
2022     return db.exec(getServiceRegistrationsByHostQuery(host));
2023   }
2024 
2025   private Function<EntityManager, List<ServiceRegistration>> getServiceRegistrationsByHostQuery(String host) {
2026     return namedQuery.findAll(
2027         "ServiceRegistration.getByHost",
2028         ServiceRegistration.class,
2029         Pair.of("host", host)
2030     );
2031   }
2032 
2033   /**
2034    * {@inheritDoc}
2035    *
2036    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistration(java.lang.String,
2037    *      java.lang.String)
2038    */
2039   @Override
2040   public ServiceRegistration getServiceRegistration(String serviceType, String host) {
2041     return db.exec(getServiceRegistrationQuery(serviceType, host))
2042         .orElse(null);
2043   }
2044 
2045   /**
2046    * Sets the trusted http client.
2047    *
2048    * @param client
2049    *          the trusted http client
2050    */
2051   @Reference
2052   void setTrustedHttpClient(TrustedHttpClient client) {
2053     this.client = client;
2054   }
2055 
2056   /**
2057    * Callback for setting the security service.
2058    *
2059    * @param securityService
2060    *          the securityService to set
2061    */
2062   @Reference
2063   public void setSecurityService(SecurityService securityService) {
2064     this.securityService = securityService;
2065   }
2066 
2067   /** OSGi DI. */
2068   @Reference(
2069       cardinality = ReferenceCardinality.OPTIONAL,
2070       policy =  ReferencePolicy.DYNAMIC,
2071       unbind = "unsetIncidentService"
2072   )
2073   public void setIncidentService(IncidentService incidentService) {
2074     this.incidentService = incidentService;
2075     // Manually resolve the cyclic dependency between the incident service and the service registry
2076     ((OsgiIncidentService) incidentService).setServiceRegistry(this);
2077     this.incidents = new Incidents(this, incidentService);
2078   }
2079 
2080   public void unsetIncidentService(IncidentService incidentService) {
2081     if (this.incidentService == incidentService) {
2082       this.incidentService = null;
2083       this.incidents = null;
2084     }
2085   }
2086 
2087   /**
2088    * Update the jobs failure history and the service status with the given information. All these data are then use for
2089    * the jobs failover strategy. Only the terminated job (with FAILED or FINISHED status) are taken into account.
2090    *
2091    * @param job
2092    *          the current job that failed/succeeded
2093    * @throws ServiceRegistryException
2094    * @throws NotFoundException
2095    */
2096   private void updateServiceForFailover(JpaJob job) throws ServiceRegistryException, NotFoundException {
2097     if (job.getStatus() != Status.FAILED && job.getStatus() != Status.FINISHED) {
2098       return;
2099     }
2100 
2101     job.setStatus(job.getStatus(), job.getFailureReason());
2102 
2103     // At this point, the only possible states for the current service are NORMAL and WARNING,
2104     // the services in ERROR state will not be chosen by the dispatcher
2105     ServiceRegistrationJpaImpl currentService = job.getProcessorServiceRegistration();
2106     if (currentService == null) {
2107       return;
2108     }
2109 
2110     // Job is finished with a failure
2111     if (job.getStatus() == FAILED && !DATA.equals(job.getFailureReason())) {
2112 
2113       // Services in WARNING or ERROR state triggered by current job
2114       List<ServiceRegistrationJpaImpl> relatedWarningOrErrorServices = getRelatedWarningErrorServices(job);
2115 
2116       // Before this job failed there was at least one job failed with this job signature on any service
2117       if (relatedWarningOrErrorServices.size() > 0) {
2118         for (ServiceRegistrationJpaImpl relatedService : relatedWarningOrErrorServices) {
2119           // Skip current service from the list
2120           if (currentService.equals(relatedService)) {
2121             continue;
2122           }
2123 
2124           // De-escalate the state of related services as the issue is most likely with the job not the service
2125           // Reset the WARNING job to NORMAL
2126           if (relatedService.getServiceState() == WARNING) {
2127             logger.info("State reset to NORMAL for related service {} on host {}", relatedService.getServiceType(),
2128                     relatedService.getHost());
2129             relatedService.setServiceState(NORMAL, job.toJob().getSignature());
2130           }
2131 
2132           // Reset the ERROR job to WARNING
2133           else if (relatedService.getServiceState() == ERROR) {
2134             logger.info("State reset to WARNING for related service {} on host {}", relatedService.getServiceType(),
2135                     relatedService.getHost());
2136             relatedService.setServiceState(WARNING, relatedService.getWarningStateTrigger());
2137           }
2138 
2139           updateServiceState(relatedService);
2140         }
2141       }
2142 
2143       // This is the first job with this signature failing on any service
2144       else {
2145         // Set the current service to WARNING state
2146         if (currentService.getServiceState() == NORMAL) {
2147           logger.info("State set to WARNING for current service {} on host {}", currentService.getServiceType(),
2148                   currentService.getHost());
2149           currentService.setServiceState(WARNING, job.toJob().getSignature());
2150           updateServiceState(currentService);
2151         }
2152 
2153         // The current service already is in WARNING state and max attempts is reached
2154         else if (errorStatesEnabled && !noErrorStateServiceTypes.contains(currentService.getServiceType())
2155                 && getHistorySize(currentService) >= maxAttemptsBeforeErrorState) {
2156           logger.info("State set to ERROR for current service {} on host {}", currentService.getServiceType(),
2157                   currentService.getHost());
2158           currentService.setServiceState(ERROR, job.toJob().getSignature());
2159           updateServiceState(currentService);
2160         }
2161       }
2162     }
2163 
2164     // Job is finished without failure
2165     else if (job.getStatus() == Status.FINISHED) {
2166       // If the service was in warning state reset to normal state
2167       if (currentService.getServiceState() == WARNING) {
2168         logger.info("State reset to NORMAL for current service {} on host {}", currentService.getServiceType(),
2169                 currentService.getHost());
2170         currentService.setServiceState(NORMAL);
2171         updateServiceState(currentService);
2172       }
2173     }
2174   }
2175 
2176   /**
2177    * {@inheritDoc}
2178    *
2179    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#sanitize(java.lang.String, java.lang.String)
2180    */
2181   @Override
2182   public void sanitize(String serviceType, String host) throws NotFoundException {
2183     db.execChecked(em -> {
2184       ServiceRegistrationJpaImpl service = getServiceRegistrationQuery(serviceType, host).apply(em)
2185           .orElseThrow(NotFoundException::new);
2186 
2187       logger.info("State reset to NORMAL for service {} on host {} through sanitize method", service.getServiceType(),
2188           service.getHost());
2189       service.setServiceState(NORMAL);
2190       updateServiceState(service);
2191     });
2192   }
2193 
2194   /**
2195    * Gets the failed jobs history for the given service registration
2196    *
2197    * @param serviceRegistration
2198    * @return the failed jobs history size
2199    * @throws IllegalArgumentException
2200    *           if parameter is null
2201    * @throws ServiceRegistryException
2202    */
2203   private int getHistorySize(ServiceRegistration serviceRegistration) throws ServiceRegistryException {
2204     if (serviceRegistration == null) {
2205       throw new IllegalArgumentException("serviceRegistration must not be null!");
2206     }
2207 
2208     logger.debug("Calculating count of jobs who failed due to service {}", serviceRegistration);
2209 
2210     try {
2211       return db.exec(namedQuery.find(
2212           "Job.count.history.failed",
2213           Number.class,
2214           Pair.of("serviceType", serviceRegistration.getServiceType()),
2215           Pair.of("host", serviceRegistration.getHost())
2216       )).intValue();
2217     } catch (Exception e) {
2218       throw new ServiceRegistryException(e);
2219     }
2220   }
2221 
2222   /**
2223    * Gets the services in WARNING or ERROR state triggered by this job
2224    *
2225    * @param job
2226    *          the given job to get the related services
2227    * @return a list of services triggered by the job
2228    * @throws IllegalArgumentException
2229    *           if the given job was null
2230    * @throws ServiceRegistryException
2231    *           if the there was a problem with the query
2232    */
2233   private List<ServiceRegistrationJpaImpl> getRelatedWarningErrorServices(JpaJob job) throws ServiceRegistryException {
2234     if (job == null) {
2235       throw new IllegalArgumentException("job must not be null!");
2236     }
2237 
2238     logger.debug("Try to get the services in WARNING or ERROR state triggered by {} failed", job);
2239 
2240     try {
2241       return db.exec(namedQuery.findAll(
2242           "ServiceRegistration.relatedservices.warning_error",
2243           ServiceRegistrationJpaImpl.class,
2244           Pair.of("serviceType", job.getJobType())
2245       )).stream()
2246           // TODO: modify the query to avoid to go through the list here
2247           .filter(rs ->
2248               (rs.getServiceState() == WARNING && rs.getWarningStateTrigger() == job.toJob().getSignature())
2249               || (rs.getServiceState() == ERROR && rs.getErrorStateTrigger() == job.toJob().getSignature())
2250           ).collect(Collectors.toList());
2251     } catch (Exception e) {
2252       throw new ServiceRegistryException(e);
2253     }
2254   }
2255 
2256   /**
2257    * Returns a filtered list of service registrations, containing only those that are online, not in maintenance mode,
2258    * and with a specific service type that are running on a host which is not already maxed out.
2259    *
2260    * @param serviceRegistrations
2261    *          the complete list of service registrations
2262    * @param hostRegistrations
2263    *          the complete list of available host registrations
2264    * @param systemLoad
2265    *          the map of hosts to the number of running jobs
2266    * @param jobType
2267    *          the job type for which the services registrations are filtered
2268    */
2269   protected List<ServiceRegistration> getServiceRegistrationsWithCapacity(String jobType,
2270           List<ServiceRegistration> serviceRegistrations, List<HostRegistration> hostRegistrations,
2271           final SystemLoad systemLoad) {
2272     final List<String> hostBaseUrls = hostRegistrations.stream()
2273                                                        .map(HostRegistration::getBaseUrl)
2274                                                        .collect(Collectors.toUnmodifiableList());
2275     final List<ServiceRegistration> filteredList = new ArrayList<>();
2276 
2277     for (ServiceRegistration service : serviceRegistrations) {
2278       // Skip service if host not available
2279       if (!hostBaseUrls.contains(service.getHost())) {
2280         logger.trace("Not considering {} because it's host {} is not available for dispatching", service,
2281                 service.getHost());
2282         continue;
2283       }
2284 
2285       // Skip services that are not of the requested type
2286       if (!jobType.equals(service.getServiceType())) {
2287         logger.trace("Not considering {} because it is of the wrong job type", service);
2288         continue;
2289       }
2290 
2291       // Skip services that are in error state
2292       if (service.getServiceState() == ERROR) {
2293         logger.trace("Not considering {} because it is in error state", service);
2294         continue;
2295       }
2296 
2297       // Skip services that are in maintenance mode
2298       if (service.isInMaintenanceMode()) {
2299         logger.trace("Not considering {} because it is in maintenance mode", service);
2300         continue;
2301       }
2302 
2303       // Skip services that are marked as offline
2304       if (!service.isOnline()) {
2305         logger.trace("Not considering {} because it is currently offline", service);
2306         continue;
2307       }
2308 
2309       // Determine the maximum load for this host
2310       Float hostLoadMax = null;
2311       for (HostRegistration host : hostRegistrations) {
2312         if (host.getBaseUrl().equals(service.getHost())) {
2313           hostLoadMax = host.getMaxLoad();
2314           break;
2315         }
2316       }
2317       if (hostLoadMax == null) {
2318         logger.warn("Unable to determine max load for host {}", service.getHost());
2319       }
2320 
2321       // Determine the current load for this host
2322       Float hostLoad = systemLoad.get(service.getHost()).getLoadFactor();
2323       if (hostLoad == null) {
2324         logger.warn("Unable to determine current load for host {}", service.getHost());
2325       }
2326 
2327       // Is this host suited for processing?
2328       if (hostLoad == null || hostLoadMax == null || hostLoad < hostLoadMax) {
2329         logger.debug("Adding candidate service {} for processing of jobs of type '{}' (host load is {} of max {})",
2330            service, jobType, hostLoad, hostLoadMax);
2331         filteredList.add(service);
2332       }
2333     }
2334 
2335     // Sort the list by capacity
2336     filteredList.sort(new LoadComparator(systemLoad));
2337 
2338     return filteredList;
2339   }
2340 
2341   /**
2342    * Returns a filtered list of service registrations, containing only those that are online, not in maintenance mode,
2343    * and with a specific service type, ordered by load.
2344    *
2345    * @param jobType
2346    *          the job type for which the services registrations are filtered
2347    * @param serviceRegistrations
2348    *          the complete list of service registrations
2349    * @param hostRegistrations
2350    *          the complete list of available host registrations
2351    * @param systemLoad
2352    *
2353    */
2354   protected List<ServiceRegistration> getServiceRegistrationsByLoad(String jobType,
2355           List<ServiceRegistration> serviceRegistrations, List<HostRegistration> hostRegistrations,
2356           final SystemLoad systemLoad) {
2357     final List<String> hostBaseUrls = hostRegistrations.stream()
2358                                                        .map(HostRegistration::getBaseUrl)
2359                                                        .collect(Collectors.toUnmodifiableList());
2360     final List<ServiceRegistration> filteredList = new ArrayList<>();
2361 
2362     logger.debug("Finding services to dispatch job of type {}", jobType);
2363 
2364     for (ServiceRegistration service : serviceRegistrations) {
2365       // Skip service if host not available
2366       if (!hostBaseUrls.contains(service.getHost())) {
2367         logger.trace("Not considering {} because it's host {} is not available for dispatching", service,
2368                 service.getHost());
2369         continue;
2370       }
2371 
2372       // Skip services that are not of the requested type
2373       if (!jobType.equals(service.getServiceType())) {
2374         logger.trace("Not considering {} because it is of the wrong job type", service);
2375         continue;
2376       }
2377 
2378       // Skip services that are in error state
2379       if (service.getServiceState() == ERROR) {
2380         logger.trace("Not considering {} because it is in error state", service);
2381         continue;
2382       }
2383 
2384       // Skip services that are in maintenance mode
2385       if (service.isInMaintenanceMode()) {
2386         logger.trace("Not considering {} because it is in maintenance mode", service);
2387         continue;
2388       }
2389 
2390       // Skip services that are marked as offline
2391       if (!service.isOnline()) {
2392         logger.trace("Not considering {} because it is currently offline", service);
2393         continue;
2394       }
2395 
2396       // We found a candidate service
2397       logger.debug("Adding candidate service {} for processing of job of type '{}'", service, jobType);
2398       filteredList.add(service);
2399     }
2400 
2401     // Sort the list by capacity and distinguish between composer jobs and other jobs
2402     if ("org.opencastproject.composer".equals(jobType)) {
2403       Collections.sort(filteredList, new LoadComparatorEncoding(systemLoad));
2404     } else {
2405       Collections.sort(filteredList, new LoadComparator(systemLoad));
2406     }
2407 
2408     return filteredList;
2409   }
2410 
2411   /**
2412    * {@inheritDoc}
2413    *
2414    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getMaxLoads()
2415    */
2416   @Override
2417   public SystemLoad getMaxLoads() throws ServiceRegistryException {
2418     final SystemLoad loads = new SystemLoad();
2419     getHostRegistrations().stream()
2420         .map(host -> new NodeLoad(host.getBaseUrl(), 0.0f, host.getMaxLoad()))
2421         .forEach(loads::addNodeLoad);
2422     return loads;
2423   }
2424 
2425   /**
2426    * {@inheritDoc}
2427    *
2428    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getMaxLoadOnNode(java.lang.String)
2429    */
2430   @Override
2431   public NodeLoad getMaxLoadOnNode(String host) throws ServiceRegistryException, NotFoundException {
2432     try {
2433       float maxLoad = db.exec(namedQuery.find(
2434           "HostRegistration.getMaxLoadByHostName",
2435           Number.class,
2436           Pair.of("host", host)
2437       )).floatValue();
2438       return new NodeLoad(host, 0.0f, maxLoad);
2439     } catch (NoResultException e) {
2440       throw new NotFoundException(e);
2441     } catch (Exception e) {
2442       throw new ServiceRegistryException(e);
2443     }
2444   }
2445 
2446   /** A periodic check on each service registration to ensure that it is still alive. */
2447   class JobProducerHeartbeat implements Runnable {
2448 
2449     /** List of service registrations that have been found unresponsive last time we checked */
2450     private final List<ServiceRegistration> unresponsive = new ArrayList<>();
2451 
2452     /**
2453      * {@inheritDoc}
2454      *
2455      * @see java.lang.Runnable#run()
2456      */
2457     @Override
2458     public void run() {
2459       logger.debug("Checking for unresponsive services");
2460 
2461       try {
2462         List<ServiceRegistration> serviceRegistrations = getOnlineServiceRegistrations();
2463 
2464         for (ServiceRegistration service : serviceRegistrations) {
2465           if (!service.isJobProducer()) {
2466             continue;
2467           }
2468           if (service.isInMaintenanceMode()) {
2469             continue;
2470           }
2471 
2472           // We think this service is online and available. Prove it.
2473           String serviceUrl = UrlSupport.concat(service.getHost(), service.getPath(), "dispatch");
2474 
2475           HttpHead options = new HttpHead(serviceUrl);
2476           HttpResponse response = null;
2477           try {
2478             try {
2479               response = client.execute(options);
2480               if (response != null) {
2481                 switch (response.getStatusLine().getStatusCode()) {
2482                   case HttpStatus.SC_OK:
2483                     // this service is reachable, continue checking other services
2484                     logger.trace("Service " + service + " is responsive: " + response.getStatusLine());
2485                     if (unresponsive.remove(service)) {
2486                       logger.info("Service {} is still online", service);
2487                     } else if (!service.isOnline()) {
2488                       try {
2489                         setOnlineStatus(service.getServiceType(), service.getHost(), service.getPath(), true, true);
2490                         logger.info("Service {} is back online", service);
2491                       } catch (ServiceRegistryException e) {
2492                         logger.warn("Error setting online status for {}", service);
2493                       }
2494                     }
2495                     continue;
2496                   default:
2497                     if (!service.isOnline()) {
2498                       continue;
2499                     }
2500                     logger.warn("Service {} is not working as expected: {}", service, response.getStatusLine());
2501                 }
2502               } else {
2503                 logger.warn("Service {} does not respond", service);
2504               }
2505             } catch (TrustedHttpClientException e) {
2506               if (!service.isOnline()) {
2507                 continue;
2508               }
2509               logger.warn("Unable to reach {}", service, e);
2510             }
2511 
2512             // If we get here, the service did not respond as expected
2513             try {
2514               if (unresponsive.contains(service)) {
2515                 unRegisterService(service.getServiceType(), service.getHost());
2516                 unresponsive.remove(service);
2517                 logger.warn("Marking {} as offline", service);
2518               } else {
2519                 unresponsive.add(service);
2520                 logger.warn("Added {} to the watch list", service);
2521               }
2522             } catch (ServiceRegistryException e) {
2523               logger.warn("Unable to unregister unreachable service: {}", service, e);
2524             }
2525           } finally {
2526             client.close(response);
2527           }
2528         }
2529       } catch (Throwable t) {
2530         logger.warn("Error while checking for unresponsive services", t);
2531       }
2532 
2533       logger.debug("Finished checking for unresponsive services");
2534     }
2535   }
2536 
2537   /**
2538    * Comparator that will sort service registrations depending on their capacity, wich is defined by the number of jobs
2539    * the service's host is already running divided by the MaxLoad of the Server. The lower that number, the bigger
2540    * the capacity.
2541    */
2542   private class LoadComparator implements Comparator<ServiceRegistration> {
2543 
2544     protected SystemLoad loadByHost = null;
2545 
2546     /**
2547      * Creates a new comparator which is using the given map of host names and loads.
2548      *
2549      * @param loadByHost
2550      *          the current work load by host
2551      */
2552     LoadComparator(SystemLoad loadByHost) {
2553       this.loadByHost = loadByHost;
2554     }
2555 
2556     @Override
2557     public int compare(ServiceRegistration serviceA, ServiceRegistration serviceB) {
2558       String hostA = serviceA.getHost();
2559       String hostB = serviceB.getHost();
2560       NodeLoad nodeA = loadByHost.get(hostA);
2561       NodeLoad nodeB = loadByHost.get(hostB);
2562       // If the load factors are about the same, sort based on maximum load
2563       if (Math.abs(nodeA.getLoadFactor() - nodeB.getLoadFactor()) <= 0.01) {
2564         // NOTE: The sort order below is *reversed* from what you'd expect
2565         // When we're comparing the load factors we want the node with the lowest factor to be first
2566         // When we're comparing the maximum load value, we want the node with the highest max to be first
2567         return Float.compare(nodeB.getMaxLoad(), nodeA.getMaxLoad());
2568       }
2569       return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2570     }
2571   }
2572 
2573   /**
2574    * Comparator that will sort service registrations depending on their capacity, which is defined by the number of jobs
2575    * the service's host is already running divided by the MaxLoad of the Server. The lower that number, the bigger
2576    * the capacity.
2577    * This Comparator will prefer encoding workers, if none are defined in the configuration file it will act like
2578    * the LoadComparator.
2579    */
2580   private class LoadComparatorEncoding extends LoadComparator implements Comparator<ServiceRegistration> {
2581 
2582     /**
2583      * Creates a new comparator which is using the given map of host names and loads.
2584      *
2585      * @param loadByHost
2586      */
2587     LoadComparatorEncoding(SystemLoad loadByHost) {
2588       super(loadByHost);
2589     }
2590 
2591     @Override
2592     public int compare(ServiceRegistration serviceA, ServiceRegistration serviceB) {
2593       String hostA = serviceA.getHost();
2594       String hostB = serviceB.getHost();
2595       NodeLoad nodeA = loadByHost.get(hostA);
2596       NodeLoad nodeB = loadByHost.get(hostB);
2597 
2598       if (encodingWorkers != null) {
2599         if (encodingWorkers.contains(hostA) && !encodingWorkers.contains(hostB)) {
2600           if (nodeA.getLoadFactor() <= encodingThreshold) {
2601             return -1;
2602           }
2603           return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2604         }
2605         if (encodingWorkers.contains(hostB) && !encodingWorkers.contains(hostA)) {
2606           if (nodeB.getLoadFactor() <= encodingThreshold) {
2607             return 1;
2608           }
2609           return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2610         }
2611       }
2612         return super.compare(serviceA, serviceB);
2613     }
2614   }
2615 }