View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.serviceregistry.impl;
23  
24  import static java.lang.String.format;
25  import static org.apache.commons.lang3.StringUtils.isBlank;
26  import static org.opencastproject.db.Queries.namedQuery;
27  import static org.opencastproject.job.api.AbstractJobProducer.ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY;
28  import static org.opencastproject.job.api.AbstractJobProducer.DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING;
29  import static org.opencastproject.job.api.Job.FailureReason.DATA;
30  import static org.opencastproject.job.api.Job.Status.FAILED;
31  import static org.opencastproject.serviceregistry.api.ServiceState.ERROR;
32  import static org.opencastproject.serviceregistry.api.ServiceState.NORMAL;
33  import static org.opencastproject.serviceregistry.api.ServiceState.WARNING;
34  import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
35  
36  import org.opencastproject.db.DBSession;
37  import org.opencastproject.db.DBSessionFactory;
38  import org.opencastproject.job.api.Job;
39  import org.opencastproject.job.api.Job.Status;
40  import org.opencastproject.job.jpa.JpaJob;
41  import org.opencastproject.security.api.Organization;
42  import org.opencastproject.security.api.SecurityService;
43  import org.opencastproject.security.api.TrustedHttpClient;
44  import org.opencastproject.security.api.TrustedHttpClientException;
45  import org.opencastproject.security.api.User;
46  import org.opencastproject.serviceregistry.api.HostRegistration;
47  import org.opencastproject.serviceregistry.api.HostStatistics;
48  import org.opencastproject.serviceregistry.api.IncidentService;
49  import org.opencastproject.serviceregistry.api.Incidents;
50  import org.opencastproject.serviceregistry.api.JaxbServiceStatistics;
51  import org.opencastproject.serviceregistry.api.ServiceRegistration;
52  import org.opencastproject.serviceregistry.api.ServiceRegistry;
53  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
54  import org.opencastproject.serviceregistry.api.ServiceStatistics;
55  import org.opencastproject.serviceregistry.api.SystemLoad;
56  import org.opencastproject.serviceregistry.api.SystemLoad.NodeLoad;
57  import org.opencastproject.serviceregistry.impl.jmx.HostsStatistics;
58  import org.opencastproject.serviceregistry.impl.jmx.JobsStatistics;
59  import org.opencastproject.serviceregistry.impl.jmx.ServicesStatistics;
60  import org.opencastproject.serviceregistry.impl.jpa.HostRegistrationJpaImpl;
61  import org.opencastproject.serviceregistry.impl.jpa.ServiceRegistrationJpaImpl;
62  import org.opencastproject.systems.OpencastConstants;
63  import org.opencastproject.util.NotFoundException;
64  import org.opencastproject.util.UrlSupport;
65  import org.opencastproject.util.data.functions.Strings;
66  import org.opencastproject.util.function.ThrowingConsumer;
67  import org.opencastproject.util.jmx.JmxUtil;
68  
69  import org.apache.commons.lang3.StringUtils;
70  import org.apache.commons.lang3.time.DateUtils;
71  import org.apache.commons.lang3.tuple.Pair;
72  import org.apache.http.HttpResponse;
73  import org.apache.http.HttpStatus;
74  import org.apache.http.client.methods.HttpHead;
75  import org.osgi.service.cm.ConfigurationException;
76  import org.osgi.service.cm.ManagedService;
77  import org.osgi.service.component.ComponentContext;
78  import org.osgi.service.component.annotations.Activate;
79  import org.osgi.service.component.annotations.Component;
80  import org.osgi.service.component.annotations.Deactivate;
81  import org.osgi.service.component.annotations.Modified;
82  import org.osgi.service.component.annotations.Reference;
83  import org.osgi.service.component.annotations.ReferenceCardinality;
84  import org.osgi.service.component.annotations.ReferencePolicy;
85  import org.slf4j.Logger;
86  import org.slf4j.LoggerFactory;
87  
88  import java.net.InetAddress;
89  import java.net.URI;
90  import java.net.URISyntaxException;
91  import java.util.ArrayList;
92  import java.util.Arrays;
93  import java.util.Collections;
94  import java.util.Comparator;
95  import java.util.Date;
96  import java.util.Dictionary;
97  import java.util.HashMap;
98  import java.util.List;
99  import java.util.Map;
100 import java.util.Objects;
101 import java.util.Optional;
102 import java.util.concurrent.Executors;
103 import java.util.concurrent.ScheduledExecutorService;
104 import java.util.concurrent.TimeUnit;
105 import java.util.concurrent.atomic.AtomicReference;
106 import java.util.function.Consumer;
107 import java.util.function.Function;
108 import java.util.stream.Collectors;
109 
110 import javax.management.ObjectInstance;
111 import javax.persistence.EntityManager;
112 import javax.persistence.EntityManagerFactory;
113 import javax.persistence.LockModeType;
114 import javax.persistence.NoResultException;
115 import javax.persistence.TypedQuery;
116 
117 /** JPA implementation of the {@link ServiceRegistry} */
118 @Component(
119   property = {
120     "service.description=Service registry"
121   },
122   immediate = true,
123   service = { ManagedService.class, ServiceRegistry.class, ServiceRegistryJpaImpl.class }
124 )
125 public class ServiceRegistryJpaImpl implements ServiceRegistry, ManagedService {
126 
127   /** JPA persistence unit name */
128   public static final String PERSISTENCE_UNIT = "org.opencastproject.common";
129 
130   /** Id of the workflow's start operation operation, need to match the corresponding enum value in WorkflowServiceImpl */
131   public static final String START_OPERATION = "START_OPERATION";
132 
133   /** Id of the workflow's start workflow operation, need to match the corresponding enum value in WorkflowServiceImpl */
134   public static final String START_WORKFLOW = "START_WORKFLOW";
135 
136   /** Id of the workflow's resume operation, need to match the corresponding enum value in WorkflowServiceImpl */
137   public static final String RESUME = "RESUME";
138 
139   /** Identifier for the workflow service */
140   public static final String TYPE_WORKFLOW = "org.opencastproject.workflow";
141 
142   static final Logger logger = LoggerFactory.getLogger(ServiceRegistryJpaImpl.class);
143 
144   /** The list of registered JMX beans */
145   protected List<ObjectInstance> jmxBeans = new ArrayList<>();
146 
147   /** Hosts statistics JMX type */
148   private static final String JMX_HOSTS_STATISTICS_TYPE = "HostsStatistics";
149 
150   /** Services statistics JMX type */
151   private static final String JMX_SERVICES_STATISTICS_TYPE = "ServicesStatistics";
152 
153   /** Jobs statistics JMX type */
154   private static final String JMX_JOBS_STATISTICS_TYPE = "JobsStatistics";
155 
156   /** The JMX business object for hosts statistics */
157   private HostsStatistics hostsStatistics;
158 
159   /** The JMX business object for services statistics */
160   private ServicesStatistics servicesStatistics;
161 
162   /** The JMX business object for jobs statistics */
163   private JobsStatistics jobsStatistics;
164 
165   /** Current job used to process job in the service registry */
166   private static final ThreadLocal<Job> currentJob = new ThreadLocal<>();
167 
168   /** Configuration key for the maximum load */
169   protected static final String OPT_MAXLOAD = "org.opencastproject.server.maxload";
170 
171   /** Configuration key for the interval to check whether the hosts in the service registry are still alive, in seconds */
172   protected static final String OPT_HEARTBEATINTERVAL = "heartbeat.interval";
173 
174   /** Configuration key for the collection of job statistics */
175   protected static final String OPT_JOBSTATISTICS = "jobstats.collect";
176 
177   /** Configuration key for the retrieval of service statistics: Do not consider jobs older than max_job_age (in days) */
178   protected static final String OPT_SERVICE_STATISTICS_MAX_JOB_AGE = "org.opencastproject.statistics.services.max_job_age";
179 
180   /** Configuration key for the encoding preferred worker nodes */
181   protected static final String OPT_ENCODING_WORKERS = "org.opencastproject.encoding.workers";
182 
183   /** Configuration key for the encoding workers load threshold */
184   protected static final String OPT_ENCODING_THRESHOLD = "org.opencastproject.encoding.workers.threshold";
185 
186   /** The http client to use when connecting to remote servers */
187   protected TrustedHttpClient client = null;
188 
189   /** Default jobs limit during dispatching
190    * (larger value will fetch more entries from the database at the same time and increase RAM usage) */
191   static final int DEFAULT_DISPATCH_JOBS_LIMIT = 100;
192 
193   /** Default setting on job statistics collection */
194   static final boolean DEFAULT_JOB_STATISTICS = false;
195 
196   /** Default setting on service statistics retrieval */
197   static final int DEFAULT_SERVICE_STATISTICS_MAX_JOB_AGE = 14;
198 
199   static final List<String>  DEFAULT_ENCODING_WORKERS = new ArrayList<String>();
200 
201   static final double DEFAULT_ENCODING_THRESHOLD = 0.0;
202 
203   /** The configuration key for setting {@link #maxAttemptsBeforeErrorState} */
204   static final String MAX_ATTEMPTS_CONFIG_KEY = "max.attempts";
205 
206   /** The configuration key for setting {@link #noErrorStateServiceTypes} */
207   static final String NO_ERROR_STATE_SERVICE_TYPES_CONFIG_KEY = "no.error.state.service.types";
208 
209   /** Default value for {@link #maxAttemptsBeforeErrorState} */
210   private static final int DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE = 10;
211 
212   /** Default value for {@link #errorStatesEnabled} */
213   private static final boolean DEFAULT_ERROR_STATES_ENABLED = true;
214 
215   /** Number of failed jobs on a service before to set it in error state. -1 will disable error states completely. */
216   protected int maxAttemptsBeforeErrorState = DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE;
217   private boolean errorStatesEnabled = DEFAULT_ERROR_STATES_ENABLED;
218 
219   /** Services for which error state is disabled */
220   private List<String> noErrorStateServiceTypes = new ArrayList<>();
221 
222   /** Default delay between checking if hosts are still alive in seconds * */
223   static final long DEFAULT_HEART_BEAT = 60;
224 
225   /** Default job load when not passed by service creating the job * */
226   static final float DEFAULT_JOB_LOAD = 0.1f;
227 
228   /** This host's base URL */
229   protected String hostName;
230 
231   /** This host's descriptive node name eg admin, worker01 */
232   protected String nodeName;
233 
234   /** The base URL for job URLs */
235   protected String jobHost;
236 
237   /** Comma-seperate list with URLs of encoding specialised workers*/
238   protected static List<String> encodingWorkers = DEFAULT_ENCODING_WORKERS;
239 
240   /** Threshold value under which defined workers get preferred when dispatching encoding jobs */
241   protected static double encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
242 
243   /** The factory used to generate the entity manager */
244   protected EntityManagerFactory emf = null;
245 
246   protected DBSessionFactory dbSessionFactory;
247 
248   protected DBSession db;
249 
250   /** The thread pool to use for dispatching queued jobs and checking on phantom services. */
251   protected ScheduledExecutorService scheduledExecutor = null;
252 
253   /** The security service */
254   protected SecurityService securityService = null;
255 
256   protected IncidentService incidentService = null;
257 
258   protected Incidents incidents;
259 
260   /** Whether to collect detailed job statistics */
261   protected boolean collectJobstats = DEFAULT_JOB_STATISTICS;
262 
263   /** Maximum age of jobs being considering for service statistics */
264   protected int maxJobAge = DEFAULT_SERVICE_STATISTICS_MAX_JOB_AGE;
265 
266   /** A static list of statuses that influence how load balancing is calculated */
267   protected static final List<Status> JOB_STATUSES_INFLUENCING_LOAD_BALANCING;
268 
269   private static final Status[] activeJobStatus =
270       Arrays.stream(Status.values()).filter(Status::isActive).collect(Collectors.toList()).toArray(new Status[0]);
271 
272   protected static HashMap<Long, Float> jobCache = new HashMap<>();
273 
274   static {
275     JOB_STATUSES_INFLUENCING_LOAD_BALANCING = new ArrayList<>();
276     JOB_STATUSES_INFLUENCING_LOAD_BALANCING.add(Status.RUNNING);
277   }
278 
279   /** Whether to accept a job whose load exceeds the host’s max load */
280   protected Boolean acceptJobLoadsExeedingMaxLoad = true;
281 
282   // Current system load
283   protected float localSystemLoad = 0.0f;
284 
285   /** OSGi DI */
286   @Reference(target = "(osgi.unit.name=org.opencastproject.common)")
287   void setEntityManagerFactory(EntityManagerFactory emf) {
288     this.emf = emf;
289   }
290 
291   @Reference
292   public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
293     this.dbSessionFactory = dbSessionFactory;
294   }
295 
296   @Activate
297   public void activate(ComponentContext cc) {
298     logger.info("Activate service registry");
299 
300     db = dbSessionFactory.createSession(emf);
301 
302     // Find this host's url
303     if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty(OpencastConstants.SERVER_URL_PROPERTY))) {
304       hostName = UrlSupport.DEFAULT_BASE_URL;
305     } else {
306       hostName = cc.getBundleContext().getProperty(OpencastConstants.SERVER_URL_PROPERTY);
307     }
308 
309     // Check hostname for sanity. It should be the hosts URL with protocol but without any part of the service paths.
310     if (hostName.endsWith("/")) {
311       logger.warn("The configured value of {} ends with '/'. This is very likely a configuration error which could "
312               + "lead to services not working properly. Note that this configuration should not contain any part of "
313               + "the service paths.", OpencastConstants.SERVER_URL_PROPERTY);
314     }
315 
316     // Clean all undispatchable jobs that were orphaned when this host was last deactivated
317     cleanUndispatchableJobs(hostName);
318 
319     // Register JMX beans with statistics
320     try {
321       List<ServiceStatistics> serviceStatistics = getServiceStatistics();
322       hostsStatistics = new HostsStatistics(serviceStatistics);
323       servicesStatistics = new ServicesStatistics(hostName, serviceStatistics);
324       jobsStatistics = new JobsStatistics(hostName);
325       jmxBeans.add(JmxUtil.registerMXBean(hostsStatistics, JMX_HOSTS_STATISTICS_TYPE));
326       jmxBeans.add(JmxUtil.registerMXBean(servicesStatistics, JMX_SERVICES_STATISTICS_TYPE));
327       jmxBeans.add(JmxUtil.registerMXBean(jobsStatistics, JMX_JOBS_STATISTICS_TYPE));
328     } catch (ServiceRegistryException e) {
329       logger.error("Error registering JMX statistic beans", e);
330     }
331 
332     // Find the jobs URL
333     if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty("org.opencastproject.jobs.url"))) {
334       jobHost = hostName;
335     } else {
336       jobHost = cc.getBundleContext().getProperty("org.opencastproject.jobs.url");
337     }
338 
339     // Register this host
340     try {
341       if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty(OpencastConstants.NODE_NAME_PROPERTY))) {
342         nodeName = null;
343       } else {
344         nodeName = cc.getBundleContext().getProperty(OpencastConstants.NODE_NAME_PROPERTY);
345       }
346 
347       float maxLoad = Runtime.getRuntime().availableProcessors();
348       if (cc != null && StringUtils.isNotBlank(cc.getBundleContext().getProperty(OPT_MAXLOAD))) {
349         try {
350           maxLoad = Float.parseFloat(cc.getBundleContext().getProperty(OPT_MAXLOAD));
351           logger.info("Max load has been set manually to {}", maxLoad);
352         } catch (NumberFormatException e) {
353           logger.warn("Configuration key '{}' is not an integer. Falling back to the number of cores ({})",
354                   OPT_MAXLOAD, maxLoad);
355         }
356       }
357 
358       logger.info("Node maximum load set to {}", maxLoad);
359 
360       String address = InetAddress.getByName(URI.create(hostName).getHost()).getHostAddress();
361       long maxMemory = Runtime.getRuntime().maxMemory();
362       int cores = Runtime.getRuntime().availableProcessors();
363 
364       registerHost(hostName, address, nodeName, maxMemory, cores, maxLoad);
365     } catch (Exception e) {
366       throw new IllegalStateException("Unable to register host " + hostName + " in the service registry", e);
367     }
368 
369     // Whether a service accepts a job whose load exceeds the host’s max load
370     if (cc != null) {
371       acceptJobLoadsExeedingMaxLoad = getOptContextProperty(cc, ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY).map(Strings.toBool)
372               .getOrElse(DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING);
373     }
374 
375     localSystemLoad = 0;
376     logger.info("Activated");
377   }
378 
379   @Override
380   public float getOwnLoad() {
381     return localSystemLoad;
382   }
383 
384   @Override
385   public String getRegistryHostname() {
386     return hostName;
387   }
388 
389   @Deactivate
390   public void deactivate() {
391     logger.info("deactivate service registry");
392 
393     // Wait for job dispatcher to stop before unregistering hosts and requeuing jobs
394     if (scheduledExecutor != null) {
395       try {
396         scheduledExecutor.shutdownNow();
397         if (!scheduledExecutor.isShutdown()) {
398           logger.info("Waiting for Dispatcher to terminate");
399           scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS);
400         }
401       } catch (InterruptedException e) {
402         logger.error("Error shutting down the Dispatcher", e);
403       }
404     }
405 
406     for (ObjectInstance mbean : jmxBeans) {
407       JmxUtil.unregisterMXBean(mbean);
408     }
409 
410     try {
411       unregisterHost(hostName);
412     } catch (ServiceRegistryException e) {
413       throw new IllegalStateException("Unable to unregister host " + hostName + " from the service registry", e);
414     }
415   }
416 
417   /**
418    * {@inheritDoc}
419    *
420    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String)
421    */
422   @Override
423   public Job createJob(String type, String operation) throws ServiceRegistryException {
424     return createJob(this.hostName, type, operation, null, null, true, getCurrentJob(), DEFAULT_JOB_LOAD);
425   }
426 
427   /**
428    * {@inheritDoc}
429    *
430    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
431    *      java.util.List)
432    */
433   @Override
434   public Job createJob(String type, String operation, List<String> arguments) throws ServiceRegistryException {
435     return createJob(this.hostName, type, operation, arguments, null, true, getCurrentJob(), DEFAULT_JOB_LOAD);
436   }
437 
438   /**
439    * {@inheritDoc}
440    *
441    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
442    *      java.util.List, Float)
443    */
444   @Override
445   public Job createJob(String type, String operation, List<String> arguments, Float jobLoad)
446           throws ServiceRegistryException {
447     return createJob(this.hostName, type, operation, arguments, null, true, getCurrentJob(), jobLoad);
448   }
449 
450   /**
451    * {@inheritDoc}
452    *
453    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
454    *      java.util.List, String, boolean)
455    */
456   @Override
457   public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable)
458           throws ServiceRegistryException {
459     return createJob(this.hostName, type, operation, arguments, payload, dispatchable, getCurrentJob(),
460             DEFAULT_JOB_LOAD);
461   }
462 
463   /**
464    * {@inheritDoc}
465    *
466    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
467    *      java.util.List, java.lang.String, boolean, Float)
468    */
469   @Override
470   public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
471           Float jobLoad) throws ServiceRegistryException {
472     return createJob(this.hostName, type, operation, arguments, payload, dispatchable, getCurrentJob(), jobLoad);
473   }
474 
475   public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
476           Job parentJob) throws ServiceRegistryException {
477     return createJob(this.hostName, type, operation, arguments, payload, dispatchable, parentJob, DEFAULT_JOB_LOAD);
478   }
479 
480   /**
481    * {@inheritDoc}
482    *
483    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
484    *      java.util.List, java.lang.String, boolean, org.opencastproject.job.api.Job, Float)
485    */
486   @Override
487   public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
488           Job parentJob, Float jobLoad) throws ServiceRegistryException {
489     return createJob(this.hostName, type, operation, arguments, payload, dispatchable, parentJob, jobLoad);
490   }
491 
492   /**
493    * Creates a job on a remote host with a jobLoad of 1.0.
494    */
495   public Job createJob(String host, String serviceType, String operation, List<String> arguments, String payload,
496           boolean dispatchable, Job parentJob) throws ServiceRegistryException {
497     return createJob(host, serviceType, operation, arguments, payload, dispatchable, parentJob, DEFAULT_JOB_LOAD);
498   }
499 
500   /**
501    * Creates a job on a remote host.
502    */
503   public Job createJob(String host, String serviceType, String operation, List<String> arguments, String payload,
504           boolean dispatchable, Job parentJob, float jobLoad) throws ServiceRegistryException {
505     if (StringUtils.isBlank(host)) {
506       throw new IllegalArgumentException("Host can't be null");
507     }
508     if (StringUtils.isBlank(serviceType)) {
509       throw new IllegalArgumentException("Service type can't be null");
510     }
511     if (StringUtils.isBlank(operation)) {
512       throw new IllegalArgumentException("Operation can't be null");
513     }
514 
515     JpaJob jpaJob = db.execTxChecked(em -> {
516       ServiceRegistrationJpaImpl creatingService = getServiceRegistrationQuery(serviceType, host).apply(em)
517           .orElseThrow(() -> new ServiceRegistryException("No service registration exists for type '" + serviceType
518               + "' on host '" + host + "'"));
519 
520       if (creatingService.getHostRegistration().isMaintenanceMode()) {
521         logger.warn("Creating a job from {}, which is currently in maintenance mode.", creatingService.getHost());
522       } else if (!creatingService.getHostRegistration().isActive()) {
523         logger.warn("Creating a job from {}, which is currently inactive.", creatingService.getHost());
524       }
525 
526       User currentUser = securityService.getUser();
527       Organization currentOrganization = securityService.getOrganization();
528 
529       JpaJob job = new JpaJob(currentUser, currentOrganization, creatingService, operation, arguments, payload,
530               dispatchable, jobLoad);
531 
532       // Bind the given parent job to the new job
533       if (parentJob != null) {
534         // Get the JPA instance of the parent job
535         JpaJob jpaParentJob = getJpaJobQuery(parentJob.getId()).apply(em).orElseThrow(() -> {
536           logger.error("job with id {} not found in the persistence context", parentJob);
537           // We don't want to leave the deleted job in the cache if there
538           removeFromLoadCache(parentJob.getId());
539           return new ServiceRegistryException(new NotFoundException());
540         });
541         job.setParentJob(jpaParentJob);
542 
543         // Get the JPA instance of the root job
544         JpaJob jpaRootJob = jpaParentJob;
545         if (parentJob.getRootJobId() != null) {
546           jpaRootJob = getJpaJobQuery(parentJob.getRootJobId()).apply(em).orElseThrow(() -> {
547             logger.error("job with id {} not found in the persistence context", parentJob.getRootJobId());
548             // We don't want to leave the deleted job in the cache if there
549             removeFromLoadCache(parentJob.getRootJobId());
550             return new ServiceRegistryException(new NotFoundException());
551           });
552         }
553         job.setRootJob(jpaRootJob);
554       }
555 
556       // if this job is not dispatchable, it must be handled by the host that has created it
557       if (dispatchable) {
558         logger.trace("Queuing dispatchable '{}'", job);
559         job.setStatus(Status.QUEUED);
560       } else {
561         logger.trace("Giving new non-dispatchable '{}' its creating service as processor '{}'", job, creatingService);
562         job.setProcessorServiceRegistration(creatingService);
563       }
564 
565       em.persist(job);
566       return job;
567     });
568 
569     setJobUri(jpaJob);
570     return jpaJob.toJob();
571   }
572 
573   @Override
574   public void removeJobs(List<Long> jobIds) throws NotFoundException, ServiceRegistryException {
575     for (long jobId: jobIds) {
576       if (jobId < 1) {
577         throw new NotFoundException("Job ID must be greater than zero (0)");
578       }
579     }
580 
581     logger.debug("Start deleting jobs with IDs '{}'", jobIds);
582     try {
583       db.execTxChecked(em -> {
584         for (long jobId : jobIds) {
585           JpaJob job = em.find(JpaJob.class, jobId);
586           if (job == null) {
587             logger.error("Job with Id {} cannot be deleted: Not found.", jobId);
588             removeFromLoadCache(jobId);
589             throw new NotFoundException("Job with ID '" + jobId + "' not found");
590           }
591           deleteChildJobsQuery(jobId).accept(em);
592           em.remove(job);
593           removeFromLoadCache(jobId);
594         }
595       });
596     } catch (NotFoundException | ServiceRegistryException e) {
597       throw e;
598     } catch (Exception e) {
599       throw new ServiceRegistryException(e);
600     }
601 
602     logger.info("Jobs with IDs '{}' deleted", jobIds);
603   }
604 
605   private ThrowingConsumer<EntityManager, Exception> deleteChildJobsQuery(long jobId) {
606     return em -> {
607       List<Job> childJobs = getChildJobs(jobId);
608       if (childJobs.isEmpty()) {
609         logger.trace("No child jobs of job '{}' found to delete.", jobId);
610         return;
611       }
612 
613       logger.debug("Start deleting child jobs of job '{}'", jobId);
614 
615       try {
616         for (int i = childJobs.size() - 1; i >= 0; i--) {
617           Job job = childJobs.get(i);
618           JpaJob jobToDelete = em.find(JpaJob.class, job.getId());
619           em.remove(jobToDelete);
620           removeFromLoadCache(job.getId());
621           logger.debug("{} deleted", job);
622         }
623         logger.debug("Deleted all child jobs of job '{}'", jobId);
624       } catch (Exception e) {
625         throw new ServiceRegistryException("Unable to remove child jobs from " + jobId, e);
626       }
627     };
628   }
629 
630   @Override
631   public void removeParentlessJobs(int lifetime) throws ServiceRegistryException {
632     int count = db.execTxChecked(em -> {
633       int c = 0;
634 
635       List<Job> jobs = namedQuery.findAll("Job.withoutParent", JpaJob.class).apply(em).stream()
636           .map(JpaJob::toJob)
637           .filter(j -> j.getDateCreated().before(DateUtils.addDays(new Date(), -lifetime)))
638           // DO NOT DELETE workflow instances and operations!
639           .filter(j -> !START_OPERATION.equals(j.getOperation())
640               && !START_WORKFLOW.equals(j.getOperation())
641               && !RESUME.equals(j.getOperation()))
642           .filter(j -> j.getStatus().isTerminated())
643           .collect(Collectors.toList());
644 
645       for (Job job : jobs) {
646         try {
647           removeJobs(Collections.singletonList(job.getId()));
648           logger.debug("Parentless '{}' removed", job);
649           c++;
650         } catch (NotFoundException e) {
651           logger.debug("Parentless '{} ' not found in database", job, e);
652         }
653       }
654 
655       return c;
656     });
657 
658 
659     if (count > 0) {
660       logger.info("Successfully removed {} parentless jobs", count);
661     } else {
662       logger.trace("No parentless jobs found to remove");
663     }
664   }
665 
666   /**
667    * {@inheritDoc}
668    *
669    * @see org.osgi.service.cm.ManagedService#updated(java.util.Dictionary)
670    */
671   @Override
672   public void updated(Dictionary properties) throws ConfigurationException {
673     logger.info("Updating service registry properties");
674 
675     maxAttemptsBeforeErrorState = DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE;
676     errorStatesEnabled = DEFAULT_ERROR_STATES_ENABLED;
677     String maxAttempts = StringUtils.trimToNull((String) properties.get(MAX_ATTEMPTS_CONFIG_KEY));
678     if (maxAttempts != null) {
679       try {
680         maxAttemptsBeforeErrorState = Integer.parseInt(maxAttempts);
681         if (maxAttemptsBeforeErrorState < 0) {
682           errorStatesEnabled = false;
683           logger.info("Error states of services disabled");
684         } else {
685           errorStatesEnabled = true;
686           logger.info("Set max attempts before error state to {}", maxAttempts);
687         }
688       } catch (NumberFormatException e) {
689         logger.warn("Can not set max attempts before error state to {}. {} must be an integer", maxAttempts,
690                 MAX_ATTEMPTS_CONFIG_KEY);
691       }
692     }
693 
694     noErrorStateServiceTypes = new ArrayList<>();
695     String noErrorStateServiceTypesStr = StringUtils.trimToNull((String) properties.get(
696             NO_ERROR_STATE_SERVICE_TYPES_CONFIG_KEY));
697     if (noErrorStateServiceTypesStr != null) {
698       noErrorStateServiceTypes = Arrays.asList(noErrorStateServiceTypesStr.split("\\s*,\\s*"));
699       if (!noErrorStateServiceTypes.isEmpty()) {
700         logger.info("Set service types without error state to {}", String.join(", ", noErrorStateServiceTypes));
701       }
702     }
703 
704     long heartbeatInterval = DEFAULT_HEART_BEAT;
705     String heartbeatIntervalString = StringUtils.trimToNull((String) properties.get(OPT_HEARTBEATINTERVAL));
706     if (StringUtils.isNotBlank(heartbeatIntervalString)) {
707       try {
708         heartbeatInterval = Long.parseLong(heartbeatIntervalString);
709       } catch (Exception e) {
710         logger.warn("Heartbeat interval '{}' is malformed, setting to {}", heartbeatIntervalString, DEFAULT_HEART_BEAT);
711         heartbeatInterval = DEFAULT_HEART_BEAT;
712       }
713       if (heartbeatInterval == 0) {
714         logger.info("Heartbeat disabled");
715       } else if (heartbeatInterval < 0) {
716         logger.warn("Heartbeat interval {} seconds too low, adjusting to {}", heartbeatInterval, DEFAULT_HEART_BEAT);
717         heartbeatInterval = DEFAULT_HEART_BEAT;
718       } else {
719         logger.info("Heartbeat interval set to {} seconds", heartbeatInterval);
720       }
721     }
722 
723     String jobStatsString = StringUtils.trimToNull((String) properties.get(OPT_JOBSTATISTICS));
724     if (StringUtils.isNotBlank(jobStatsString)) {
725       try {
726         collectJobstats = Boolean.parseBoolean(jobStatsString);
727       } catch (Exception e) {
728         logger.warn("Job statistics collection flag '{}' is malformed, setting to {}", jobStatsString,
729                 DEFAULT_JOB_STATISTICS);
730         collectJobstats = DEFAULT_JOB_STATISTICS;
731       }
732     }
733 
734     // get the encoding worker nodes defined in the configuration file and parse the comma-separated list
735     String encodingWorkersString = (String) properties.get(OPT_ENCODING_WORKERS);
736     if (StringUtils.isNotBlank(encodingWorkersString)) {
737       encodingWorkers = Arrays.asList(encodingWorkersString.split("\\s*,\\s*"));
738     } else
739       encodingWorkers = DEFAULT_ENCODING_WORKERS;
740 
741     // get the encoding worker load threshold defined in the configuration file and parse the double
742     String encodingThersholdString = StringUtils.trimToNull((String) properties.get(OPT_ENCODING_THRESHOLD));
743     if (StringUtils.isNotBlank(encodingThersholdString) && encodingThersholdString != null) {
744         try {
745           double encodingThresholdTmp = Double.parseDouble(encodingThersholdString);
746           if (encodingThresholdTmp >= 0 && encodingThresholdTmp <= 1)
747             encodingThreshold = encodingThresholdTmp;
748           else {
749             encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
750             logger.warn("org.opencastproject.encoding.workers.threshold is not between 0 and 1");
751           }
752         } catch (NumberFormatException e) {
753           logger.warn("Can not set encoding threshold to {}. {} must be an parsable double", encodingThersholdString,
754               OPT_ENCODING_THRESHOLD);
755         }
756     } else
757       encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
758 
759 
760     String maxJobAgeString = StringUtils.trimToNull((String) properties.get(OPT_SERVICE_STATISTICS_MAX_JOB_AGE));
761     if (maxJobAgeString != null) {
762       try {
763         maxJobAge = Integer.parseInt(maxJobAgeString);
764         logger.info("Set service statistics max job age to {}", maxJobAgeString);
765       } catch (NumberFormatException e) {
766         logger.warn("Can not set service statistics max job age to {}. {} must be an integer", maxJobAgeString,
767                 OPT_SERVICE_STATISTICS_MAX_JOB_AGE);
768       }
769     }
770 
771     scheduledExecutor = Executors.newScheduledThreadPool(1);
772 
773     // Schedule the service heartbeat if the interval is > 0
774     if (heartbeatInterval > 0) {
775       logger.debug("Starting service heartbeat at a custom interval of {}s", heartbeatInterval);
776       scheduledExecutor.scheduleWithFixedDelay(new JobProducerHeartbeat(), heartbeatInterval, heartbeatInterval,
777               TimeUnit.SECONDS);
778     }
779   }
780 
781   /**
782    * OSGI callback when the configuration is updated. This method is only here to prevent the
783    * configuration admin service from calling the service deactivate and activate methods
784    * for a config update. It does not have to do anything as the updates are handled by updated().
785    */
786   @Modified
787   public void modified(Map<String, Object> config) throws ConfigurationException {
788     logger.debug("Modified serviceregistry");
789   }
790 
791   private Function<EntityManager, Optional<JpaJob>> getJpaJobQuery(long id) {
792     return em -> namedQuery.findByIdOpt(JpaJob.class, id)
793         .apply(em)
794         .map(jpaJob -> {
795           // JPA's caches can be out of date if external changes (e.g. another node in the cluster) have been made to
796           // this row in the database
797           em.refresh(jpaJob);
798           setJobUri(jpaJob);
799           return jpaJob;
800         });
801   }
802 
803   @Override
804   public Job getJob(long id) throws NotFoundException, ServiceRegistryException {
805     try {
806       return db.exec(getJpaJobQuery(id))
807           .map(JpaJob::toJob)
808           .orElseThrow(NotFoundException::new);
809     } catch (NotFoundException e) {
810       throw e;
811     } catch (Exception e) {
812       throw new ServiceRegistryException(e);
813     }
814   }
815 
816   /**
817    * {@inheritDoc}
818    *
819    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getCurrentJob()
820    */
821   @Override
822   public Job getCurrentJob() {
823     return currentJob.get();
824   }
825 
826   /**
827    * {@inheritDoc}
828    *
829    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#setCurrentJob(Job)
830    */
831   @Override
832   public void setCurrentJob(Job job) {
833     currentJob.set(job);
834   }
835 
836   JpaJob updateJob(JpaJob job) throws ServiceRegistryException {
837     try {
838       // tx context is opened in
839       //   updateInternal
840       //   updateServiceForFailover
841       return db.execChecked(em -> {
842         Job oldJob = getJob(job.getId());
843         JpaJob jpaJob = updateInternal(job);
844         if (!TYPE_WORKFLOW.equals(job.getJobType()) && job.getJobLoad() > 0.0f
845             && job.getProcessorServiceRegistration() != null
846             && job.getProcessorServiceRegistration().getHost().equals(getRegistryHostname())) {
847           processCachedLoadChange(job);
848         }
849 
850         // All WorkflowService Jobs will be ignored
851         if (oldJob.getStatus() != job.getStatus() && !TYPE_WORKFLOW.equals(job.getJobType())) {
852           updateServiceForFailover(job);
853         }
854 
855         return jpaJob;
856       });
857     } catch (ServiceRegistryException e) {
858       throw e;
859     } catch (NotFoundException e) {
860       // Just in case, remove from cache if there
861       removeFromLoadCache(job.getId());
862       throw new ServiceRegistryException(e);
863     } catch (Exception e) {
864       throw new ServiceRegistryException(e);
865     }
866   }
867 
868   @Override
869   public Job updateJob(Job job) throws ServiceRegistryException {
870     JpaJob jpaJob = JpaJob.from(job);
871     jpaJob.setProcessorServiceRegistration(
872             (ServiceRegistrationJpaImpl) getServiceRegistration(job.getJobType(), job.getProcessingHost()));
873     return updateJob(jpaJob).toJob();
874   }
875 
876   /**
877    * Processes the job load changes for the *local* load cache
878    *
879    * @param job
880    *   The job to apply to the load cache
881    */
882   private synchronized void processCachedLoadChange(JpaJob job) {
883     if (JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(job.getStatus()) && jobCache.get(job.getId()) == null) {
884       logger.debug("Adding to load cache: {}, type {}, load {}, status {}",
885               job, job.getJobType(), job.getJobLoad(), job.getStatus());
886       localSystemLoad += job.getJobLoad();
887       jobCache.put(job.getId(), job.getJobLoad());
888     } else if (jobCache.get(job.getId()) != null && Status.FINISHED.equals(job.getStatus())
889             || Status.FAILED.equals(job.getStatus()) || Status.WAITING.equals(job.getStatus())) {
890       logger.debug("Removing from load cache: {}, type {}, load {}, status {}",
891               job, job.getJobType(), job.getJobLoad(), job.getStatus());
892       localSystemLoad -= job.getJobLoad();
893       jobCache.remove(job.getId());
894     } else {
895       logger.debug("Ignoring for load cache: {}, type {}, status {}",
896               job, job.getJobType(), job.getStatus());
897     }
898     logger.debug("Current host load: {}, job load cache size: {}", format("%.1f", localSystemLoad), jobCache.size());
899 
900     if (jobCache.isEmpty()) {
901       if (Math.abs(localSystemLoad) > 0.0000001F) {
902         logger.warn("No jobs in the job load cache, but load is {}: setting job load to 0", localSystemLoad);
903       }
904       localSystemLoad = 0.0F;
905     }
906   }
907 
908   private synchronized void removeFromLoadCache(Long jobId) {
909     if (jobCache.get(jobId) != null) {
910       float jobLoad = jobCache.get(jobId);
911       logger.debug("Removing deleted job from load cache: Job {}, load {}", jobId, jobLoad);
912       localSystemLoad -= jobLoad;
913       jobCache.remove(jobId);
914     }
915   }
916 
917   protected JpaJob setJobUri(JpaJob job) {
918     try {
919       job.setUri(new URI(jobHost + "/services/job/" + job.getId() + ".xml"));
920     } catch (URISyntaxException e) {
921       logger.warn("Can not set the job URI", e);
922     }
923     return job;
924   }
925 
926   /**
927    * Internal method to update a job, throwing unwrapped JPA exceptions.
928    *
929    * @param job
930    *          the job to update
931    * @return the updated job
932    */
933   protected JpaJob updateInternal(JpaJob job) throws NotFoundException {
934     JpaJob fromDb = db.execTxChecked(em -> {
935       JpaJob j = em.find(JpaJob.class, job.getId());
936       if (j == null) {
937         throw new NotFoundException();
938       }
939 
940       update(j, job);
941       em.merge(j);
942       return j;
943     });
944 
945     job.setVersion(fromDb.toJob().getVersion());
946     setJobUri(job);
947     return job;
948   }
949 
950   public void updateStatisticsJobData() {
951     jobsStatistics.updateAvg(db.exec(getAvgOperationsQuery()));
952     jobsStatistics.updateJobCount(db.exec(getCountPerHostServiceQuery()));
953   }
954 
955   /**
956    * Internal method to update the service registration state, throwing unwrapped JPA exceptions.
957    *
958    * @param registration
959    *          the service registration to update
960    * @return the updated service registration
961    */
962   private ServiceRegistration updateServiceState(ServiceRegistrationJpaImpl registration) throws NotFoundException {
963     db.execTxChecked(em -> {
964       ServiceRegistrationJpaImpl fromDb = em.find(ServiceRegistrationJpaImpl.class, registration.getId());
965       if (fromDb == null) {
966         throw new NotFoundException();
967       }
968       fromDb.setServiceState(registration.getServiceState());
969       fromDb.setStateChanged(registration.getStateChanged());
970       fromDb.setWarningStateTrigger(registration.getWarningStateTrigger());
971       fromDb.setErrorStateTrigger(registration.getErrorStateTrigger());
972     });
973 
974     servicesStatistics.updateService(registration);
975     return registration;
976   }
977 
978   /**
979    * Sets the queue and runtimes and other elements of a persistent job based on a job that's been modified in memory.
980    * Times on both the objects must be modified, since the in-memory job must not be stale.
981    *
982    * @param fromDb
983    *          The job from the database
984    * @param jpaJob
985    *          The in-memory job
986    */
987   private void update(JpaJob fromDb, JpaJob jpaJob) {
988     final Job job = jpaJob.toJob();
989     final Date now = new Date();
990     final Status status = job.getStatus();
991     final Status fromDbStatus = fromDb.getStatus();
992 
993     fromDb.setPayload(job.getPayload());
994     fromDb.setStatus(job.getStatus());
995     fromDb.setDispatchable(job.isDispatchable());
996     fromDb.setVersion(job.getVersion());
997     fromDb.setOperation(job.getOperation());
998     fromDb.setArguments(job.getArguments());
999 
1000     if (job.getDateCreated() == null) {
1001       jpaJob.setDateCreated(now);
1002       fromDb.setDateCreated(now);
1003       job.setDateCreated(now);
1004     }
1005     if (job.getProcessingHost() != null) {
1006       ServiceRegistrationJpaImpl processingService = (ServiceRegistrationJpaImpl) getServiceRegistration(
1007               job.getJobType(), job.getProcessingHost());
1008       logger.debug("{} has host '{}': setting processor service to '{}'", job, job.getProcessingHost(), processingService);
1009       fromDb.setProcessorServiceRegistration(processingService);
1010     } else {
1011       logger.debug("Unsetting previous processor service registration for {}", job);
1012       fromDb.setProcessorServiceRegistration(null);
1013     }
1014     if (Status.RUNNING.equals(status) && !Status.WAITING.equals(fromDbStatus)) {
1015       if (job.getDateStarted() == null) {
1016         jpaJob.setDateStarted(now);
1017         jpaJob.setQueueTime(now.getTime() - job.getDateCreated().getTime());
1018         fromDb.setDateStarted(now);
1019         fromDb.setQueueTime(now.getTime() - job.getDateCreated().getTime());
1020         job.setDateStarted(now);
1021         job.setQueueTime(now.getTime() - job.getDateCreated().getTime());
1022       }
1023     } else if (Status.FAILED.equals(status)) {
1024       // failed jobs may not have even started properly
1025       if (job.getDateCompleted() == null) {
1026         fromDb.setDateCompleted(now);
1027         jpaJob.setDateCompleted(now);
1028         job.setDateCompleted(now);
1029         if (job.getDateStarted() != null) {
1030           jpaJob.setRunTime(now.getTime() - job.getDateStarted().getTime());
1031           fromDb.setRunTime(now.getTime() - job.getDateStarted().getTime());
1032           job.setRunTime(now.getTime() - job.getDateStarted().getTime());
1033         }
1034       }
1035     } else if (Status.FINISHED.equals(status)) {
1036       if (job.getDateStarted() == null) {
1037         // Some services (e.g. ingest) don't use job dispatching, since they start immediately and handle their own
1038         // lifecycle. In these cases, if the start date isn't set, use the date created as the start date
1039         jpaJob.setDateStarted(job.getDateCreated());
1040         job.setDateStarted(job.getDateCreated());
1041       }
1042       if (job.getDateCompleted() == null) {
1043         jpaJob.setDateCompleted(now);
1044         jpaJob.setRunTime(now.getTime() - job.getDateStarted().getTime());
1045         fromDb.setDateCompleted(now);
1046         fromDb.setRunTime(now.getTime() - job.getDateStarted().getTime());
1047         job.setDateCompleted(now);
1048         job.setRunTime(now.getTime() - job.getDateStarted().getTime());
1049       }
1050     }
1051   }
1052 
1053   /**
1054    * Fetches a host registration from persistence.
1055    *
1056    * @param host
1057    *          the host name
1058    * @return the host registration, or null if none exists
1059    */
1060   protected Function<EntityManager, Optional<HostRegistrationJpaImpl>> fetchHostRegistrationQuery(String host) {
1061     return namedQuery.findOpt(
1062         "HostRegistration.byHostName",
1063         HostRegistrationJpaImpl.class,
1064         Pair.of("host", host)
1065     );
1066   }
1067 
1068   /**
1069    * {@inheritDoc}
1070    *
1071    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#registerHost(String, String, String, long, int, float)
1072    */
1073   @Override
1074   public void registerHost(String host, String address, String nodeName, long memory, int cores, float maxLoad)
1075           throws ServiceRegistryException {
1076     try {
1077       HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1078         // Find the existing registrations for this host and if it exists, update it
1079         Optional<HostRegistrationJpaImpl> hostRegistrationOpt = fetchHostRegistrationQuery(host).apply(em);
1080         HostRegistrationJpaImpl hr;
1081 
1082         if (hostRegistrationOpt.isEmpty()) {
1083           hr = new HostRegistrationJpaImpl(host, address, nodeName, memory, cores, maxLoad, true, false);
1084           em.persist(hr);
1085         } else {
1086           hr = hostRegistrationOpt.get();
1087           hr.setIpAddress(address);
1088           hr.setNodeName(nodeName);
1089           hr.setMemory(memory);
1090           hr.setCores(cores);
1091           hr.setMaxLoad(maxLoad);
1092           hr.setOnline(true);
1093           em.merge(hr);
1094         }
1095         logger.info("Registering {} with a maximum load of {}", host, maxLoad);
1096         return hr;
1097       });
1098 
1099       hostsStatistics.updateHost(hostRegistration);
1100     } catch (Exception e) {
1101       throw new ServiceRegistryException(e);
1102     }
1103   }
1104 
1105   /**
1106    * {@inheritDoc}
1107    *
1108    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#unregisterHost(java.lang.String)
1109    */
1110   @Override
1111   public void unregisterHost(String host) throws ServiceRegistryException {
1112     try {
1113       HostRegistrationJpaImpl existingHostRegistration = db.execTxChecked(em -> {
1114         HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1115             () -> new IllegalArgumentException("Host '" + host + "' is not registered, so it can not be unregistered"));
1116 
1117         hr.setOnline(false);
1118         for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1119           unRegisterService(serviceRegistration.getServiceType(), serviceRegistration.getHost());
1120         }
1121         em.merge(hr);
1122 
1123         logger.info("Unregistering {}", host);
1124         return hr;
1125       });
1126 
1127       logger.info("Host {} unregistered", host);
1128       hostsStatistics.updateHost(existingHostRegistration);
1129     } catch (Exception e) {
1130       throw new ServiceRegistryException(e);
1131     }
1132   }
1133 
1134   /**
1135    * {@inheritDoc}
1136    *
1137    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#enableHost(String)
1138    */
1139   @Override
1140   public void enableHost(String host) throws ServiceRegistryException, NotFoundException {
1141     try {
1142       HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1143         // Find the existing registrations for this host and if it exists, update it
1144         HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1145             () -> new NotFoundException("Host '" + host + "' is currently not registered, so it can not be enabled"));
1146         hr.setActive(true);
1147         em.merge(hr);
1148         logger.info("Enabling {}", host);
1149         return hr;
1150       });
1151 
1152       db.execTxChecked(em -> {
1153         for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1154           ((ServiceRegistrationJpaImpl) serviceRegistration).setActive(true);
1155           em.merge(serviceRegistration);
1156           servicesStatistics.updateService(serviceRegistration);
1157         }
1158       });
1159 
1160       hostsStatistics.updateHost(hostRegistration);
1161     } catch (NotFoundException e) {
1162       throw e;
1163     } catch (Exception e) {
1164       throw new ServiceRegistryException(e);
1165     }
1166   }
1167 
1168   /**
1169    * {@inheritDoc}
1170    *
1171    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#disableHost(String)
1172    */
1173   @Override
1174   public void disableHost(String host) throws ServiceRegistryException, NotFoundException {
1175     try {
1176       HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1177         HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1178             () -> new NotFoundException("Host '" + host + "' is not currently registered, so it can not be disabled"));
1179 
1180         hr.setActive(false);
1181         for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1182           ((ServiceRegistrationJpaImpl) serviceRegistration).setActive(false);
1183           em.merge(serviceRegistration);
1184           servicesStatistics.updateService(serviceRegistration);
1185         }
1186         em.merge(hr);
1187 
1188         logger.info("Disabling {}", host);
1189         return hr;
1190       });
1191 
1192       hostsStatistics.updateHost(hostRegistration);
1193     } catch (NotFoundException e) {
1194       throw e;
1195     } catch (Exception e) {
1196       throw new ServiceRegistryException(e);
1197     }
1198   }
1199 
1200   /**
1201    * {@inheritDoc}
1202    *
1203    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#registerService(java.lang.String, java.lang.String,
1204    *      java.lang.String)
1205    */
1206   @Override
1207   public ServiceRegistration registerService(String serviceType, String baseUrl, String path)
1208           throws ServiceRegistryException {
1209     return registerService(serviceType, baseUrl, path, false);
1210   }
1211 
1212   /**
1213    * {@inheritDoc}
1214    *
1215    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#registerService(java.lang.String, java.lang.String,
1216    *      java.lang.String, boolean)
1217    */
1218   @Override
1219   public ServiceRegistration registerService(String serviceType, String baseUrl, String path, boolean jobProducer)
1220           throws ServiceRegistryException {
1221     cleanRunningJobs(serviceType, baseUrl);
1222     return setOnlineStatus(serviceType, baseUrl, path, true, jobProducer);
1223   }
1224 
1225   protected Function<EntityManager, Optional<ServiceRegistrationJpaImpl>> getServiceRegistrationQuery(
1226       String serviceType, String host) {
1227     return namedQuery.findOpt(
1228         "ServiceRegistration.getRegistration",
1229         ServiceRegistrationJpaImpl.class,
1230         Pair.of("serviceType", serviceType),
1231         Pair.of("host", host)
1232     );
1233   }
1234 
1235   /**
1236    * Sets the online status of a service registration.
1237    *
1238    * @param serviceType
1239    *          The job type
1240    * @param baseUrl
1241    *          the host URL
1242    * @param online
1243    *          whether the service is online or off
1244    * @param jobProducer
1245    *          whether this service produces jobs for long running operations
1246    * @return the service registration
1247    */
1248   protected ServiceRegistration setOnlineStatus(String serviceType, String baseUrl, String path, boolean online,
1249           Boolean jobProducer) throws ServiceRegistryException {
1250     if (isBlank(serviceType) || isBlank(baseUrl)) {
1251       logger.info("Uninformed baseUrl '{}' or service '{}' (path '{}')", baseUrl, serviceType, path);
1252       throw new IllegalArgumentException("serviceType and baseUrl must not be blank");
1253     }
1254 
1255     try {
1256       AtomicReference<HostRegistrationJpaImpl> hostRegistration = new AtomicReference<>();
1257       AtomicReference<ServiceRegistrationJpaImpl> registration = new AtomicReference<>();
1258 
1259       db.execTxChecked(em -> {
1260         HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(baseUrl).apply(em).orElseThrow(() -> {
1261           logger.info("No associated host registration for '{}' or service '{}' (path '{}')", baseUrl, serviceType,path);
1262           return new IllegalStateException(
1263               "A service registration can not be updated when it has no associated host registration");
1264         });
1265         hostRegistration.set(hr);
1266 
1267         ServiceRegistrationJpaImpl sr;
1268         Optional<ServiceRegistrationJpaImpl> srOpt = getServiceRegistrationQuery(serviceType, baseUrl).apply(em);
1269         if (srOpt.isEmpty()) {
1270           if (isBlank(path)) {
1271             // we can not create a new registration without a path
1272             throw new IllegalArgumentException("path must not be blank when registering new services");
1273           }
1274 
1275           // if we are not provided a value, consider it to be false
1276           sr = new ServiceRegistrationJpaImpl(hr, serviceType, path, Objects.requireNonNullElse(jobProducer, false));
1277           em.persist(sr);
1278         } else {
1279           sr = srOpt.get();
1280           if (StringUtils.isNotBlank(path)) {
1281             sr.setPath(path);
1282           }
1283           sr.setOnline(online);
1284           if (jobProducer != null) { // if we are not provided a value, don't update the persistent value
1285             sr.setJobProducer(jobProducer);
1286           }
1287           em.merge(sr);
1288         }
1289         registration.set(sr);
1290       });
1291 
1292       hostsStatistics.updateHost(hostRegistration.get());
1293       servicesStatistics.updateService(registration.get());
1294       return registration.get();
1295     } catch (Exception e) {
1296       throw new ServiceRegistryException(e);
1297     }
1298   }
1299 
1300   /**
1301    * {@inheritDoc}
1302    *
1303    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#unRegisterService(java.lang.String, java.lang.String)
1304    */
1305   @Override
1306   public void unRegisterService(String serviceType, String baseUrl) throws ServiceRegistryException {
1307     logger.info("Unregistering Service {}@{} and cleaning its running jobs", serviceType, baseUrl);
1308     // TODO: create methods that accept an entity manager, so we can execute multiple queries using the same em and tx
1309     //       (em and tx are reused if using nested db.execTx)
1310     setOnlineStatus(serviceType, baseUrl, null, false, null);
1311     cleanRunningJobs(serviceType, baseUrl);
1312   }
1313 
1314   /**
1315    * Find all undispatchable jobs that were orphaned when this host was last deactivated and set them to CANCELLED.
1316    */
1317   private void cleanUndispatchableJobs(String hostName) {
1318     logger.debug("Starting check for undispatchable jobs for host {}", hostName);
1319 
1320     try {
1321       db.execTxChecked(em -> {
1322         List<JpaJob> undispatchableJobs = namedQuery.findAll(
1323             "Job.undispatchable.status",
1324             JpaJob.class,
1325             Pair.of("statuses", List.of(
1326                 Status.INSTANTIATED.ordinal(),
1327                 Status.RUNNING.ordinal()
1328             ))
1329         ).apply(em);
1330 
1331         if (undispatchableJobs.size() > 0) {
1332           logger.info("Found {} undispatchable jobs on host {}", undispatchableJobs.size(), hostName);
1333         }
1334 
1335         for (JpaJob job : undispatchableJobs) {
1336           // Make sure the job was processed on this host
1337           String jobHost = "";
1338           if (job.getProcessorServiceRegistration() != null) {
1339             jobHost = job.getProcessorServiceRegistration().getHost();
1340           }
1341 
1342           if (!jobHost.equals(hostName)) {
1343             logger.debug("Will not cancel undispatchable job {} for host {}, it is running on a different host ({})",
1344                 job, hostName, jobHost);
1345             continue;
1346           }
1347 
1348           logger.info("Cancelling the running undispatchable job {}, it was orphaned on this host ({})", job, hostName);
1349           job.setStatus(Status.CANCELLED);
1350           em.merge(job);
1351         }
1352       });
1353     } catch (Exception e) {
1354       logger.error("Unable to clean undispatchable jobs for host {}! {}", hostName, e.getMessage());
1355     }
1356   }
1357 
1358   /**
1359    * Find all running jobs on this service and set them to RESET or CANCELLED.
1360    *
1361    * @param serviceType
1362    *          the service type
1363    * @param baseUrl
1364    *          the base url
1365    * @throws ServiceRegistryException
1366    *           if there is a problem communicating with the jobs database
1367    */
1368   private void cleanRunningJobs(String serviceType, String baseUrl) throws ServiceRegistryException {
1369     try {
1370       db.execTxChecked(em -> {
1371         TypedQuery<JpaJob> query = em.createNamedQuery("Job.processinghost.status", JpaJob.class)
1372             .setLockMode(LockModeType.PESSIMISTIC_WRITE)
1373             .setParameter("statuses", List.of(
1374                 Status.RUNNING.ordinal(),
1375                 Status.DISPATCHING.ordinal(),
1376                 Status.WAITING.ordinal()
1377             ))
1378             .setParameter("host", baseUrl)
1379             .setParameter("serviceType", serviceType);
1380 
1381         List<JpaJob> unregisteredJobs = query.getResultList();
1382         if (unregisteredJobs.size() > 0) {
1383           logger.info("Found {} jobs to clean for {}@{}", unregisteredJobs.size(), serviceType, baseUrl);
1384         }
1385 
1386         for (JpaJob job : unregisteredJobs) {
1387           if (job.isDispatchable()) {
1388             em.refresh(job);
1389             // If this job has already been treated
1390             if (Status.CANCELLED.equals(job.getStatus()) || Status.RESTART.equals(job.getStatus())) {
1391               continue;
1392             }
1393 
1394             if (job.getRootJob() != null && Status.PAUSED.equals(job.getRootJob().getStatus())) {
1395               JpaJob rootJob = job.getRootJob();
1396               cancelAllChildrenQuery(rootJob).accept(em);
1397               rootJob.setStatus(Status.RESTART);
1398               rootJob.setOperation(START_OPERATION);
1399               em.merge(rootJob);
1400               continue;
1401             }
1402 
1403             logger.info("Marking child jobs from {} as canceled", job);
1404             cancelAllChildrenQuery(job).accept(em);
1405 
1406             logger.info("Rescheduling lost {}", job);
1407             job.setStatus(Status.RESTART);
1408             job.setProcessorServiceRegistration(null);
1409           } else {
1410             logger.info("Marking lost {} as failed", job);
1411             job.setStatus(Status.FAILED);
1412           }
1413 
1414           em.merge(job);
1415         }
1416       });
1417     } catch (Exception e) {
1418       throw new ServiceRegistryException(e);
1419     }
1420   }
1421 
1422   /**
1423    * Go through all the children recursively to set them in {@link Status#CANCELLED} status
1424    *
1425    * @param job
1426    *          the parent job
1427    */
1428   private Consumer<EntityManager> cancelAllChildrenQuery(JpaJob job) {
1429     return em -> job.getChildJobs().stream()
1430         .peek(em::refresh)
1431         .filter(child -> Status.CANCELLED.equals(child.getStatus()))
1432         .forEach(child -> {
1433           cancelAllChildrenQuery(child).accept(em);
1434           child.setStatus(Status.CANCELLED);
1435           em.merge(child);
1436         });
1437   }
1438 
1439   /**
1440    * {@inheritDoc}
1441    *
1442    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#setMaintenanceStatus(java.lang.String, boolean)
1443    */
1444   @Override
1445   public void setMaintenanceStatus(String baseUrl, boolean maintenance) throws NotFoundException {
1446     logger.info("Setting maintenance mode on host '{}'", baseUrl);
1447     HostRegistrationJpaImpl reg = db.execTxChecked(em -> {
1448       HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(baseUrl).apply(em).orElseThrow(() -> {
1449             logger.warn("Can not set maintenance mode because host '{}' was not registered", baseUrl);
1450         return new NotFoundException("Can not set maintenance mode on a host that has not been registered");
1451       });
1452       hr.setMaintenanceMode(maintenance);
1453       em.merge(hr);
1454       return hr;
1455     });
1456 
1457     hostsStatistics.updateHost(reg);
1458     logger.info("Finished setting maintenance mode on host '{}'", baseUrl);
1459   }
1460 
1461   /**
1462    * {@inheritDoc}
1463    *
1464    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrations()
1465    */
1466   @Override
1467   public List<ServiceRegistration> getServiceRegistrations() {
1468     return db.exec(getServiceRegistrationsQuery());
1469   }
1470 
1471   @Override
1472   public Incidents incident() {
1473     return incidents;
1474   }
1475 
1476   private List<ServiceRegistration> getOnlineServiceRegistrations() {
1477     return db.exec(namedQuery.findAll("ServiceRegistration.getAllOnline", ServiceRegistration.class));
1478   }
1479 
1480   /**
1481    * Gets all service registrations.
1482    *
1483    * @return the list of service registrations
1484    */
1485   protected Function<EntityManager, List<ServiceRegistration>> getServiceRegistrationsQuery() {
1486     return namedQuery.findAll("ServiceRegistration.getAll", ServiceRegistration.class);
1487   }
1488 
1489   /**
1490    * Gets all host registrations
1491    *
1492    * @return the list of host registrations
1493    */
1494   @Override
1495   public List<HostRegistration> getHostRegistrations() {
1496     return db.exec(getHostRegistrationsQuery());
1497   }
1498 
1499   @Override
1500   public HostStatistics getHostStatistics() {
1501     HostStatistics statistics = new HostStatistics();
1502 
1503     db.exec(namedQuery.findAll(
1504         "HostRegistration.jobStatistics",
1505         Object[].class,
1506         Pair.of("status", List.of(Status.QUEUED.ordinal(), Status.RUNNING.ordinal()))
1507     )).forEach(row -> {
1508       final long host = ((Number) row[0]).longValue();
1509       final int status = ((Number) row[1]).intValue();
1510       final long count = ((Number) row[2]).longValue();
1511 
1512       if (status == Status.RUNNING.ordinal()) {
1513         statistics.addRunning(host, count);
1514       } else {
1515         statistics.addQueued(host, count);
1516       }
1517     });
1518 
1519     return statistics;
1520   }
1521 
1522   /**
1523    * Gets all host registrations
1524    *
1525    * @return the list of host registrations
1526    */
1527   protected Function<EntityManager, List<HostRegistration>> getHostRegistrationsQuery() {
1528     return namedQuery.findAll("HostRegistration.getAll", HostRegistration.class);
1529   }
1530 
1531   @Override
1532   public HostRegistration getHostRegistration(String hostname) throws ServiceRegistryException {
1533     return db.exec(getHostRegistrationQuery(hostname));
1534   }
1535 
1536   protected Function<EntityManager, HostRegistration> getHostRegistrationQuery(String hostname) {
1537     return namedQuery.find(
1538         "HostRegistration.byHostName",
1539         HostRegistration.class,
1540         Pair.of("host", hostname)
1541     );
1542   }
1543 
1544   /**
1545    * {@inheritDoc}
1546    *
1547    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getChildJobs(long)
1548    */
1549   @Override
1550   public List<Job> getChildJobs(long id) throws ServiceRegistryException {
1551     try {
1552       List<JpaJob> jobs = db.exec(namedQuery.findAll(
1553           "Job.root.children",
1554           JpaJob.class,
1555           Pair.of("id", id)
1556       ));
1557 
1558       if (jobs.size() == 0) {
1559         jobs = db.exec(getChildrenQuery(id));
1560       }
1561 
1562       return jobs.stream()
1563           .map(this::setJobUri)
1564           .map(JpaJob::toJob)
1565           .collect(Collectors.toList());
1566     } catch (Exception e) {
1567       throw new ServiceRegistryException(e);
1568     }
1569   }
1570 
1571   private Function<EntityManager, List<JpaJob>> getChildrenQuery(long id) {
1572     return em -> {
1573       TypedQuery<JpaJob> query = em
1574           .createNamedQuery("Job.children", JpaJob.class)
1575           .setParameter("id", id);
1576 
1577       List<JpaJob> childJobs = query.getResultList();
1578 
1579       List<JpaJob> result = new ArrayList<>(childJobs);
1580       childJobs.stream()
1581           .map(j -> getChildrenQuery(j.getId()).apply(em))
1582           .forEach(result::addAll);
1583 
1584       return result;
1585     };
1586   }
1587 
1588   /**
1589    * {@inheritDoc}
1590    *
1591    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getJobs(java.lang.String, Status)
1592    */
1593   @Override
1594   public List<Job> getJobs(String type, Status status) throws ServiceRegistryException {
1595     logger.trace("Getting jobs '{}' and '{}'", type, status);
1596 
1597     Function<EntityManager, List<JpaJob>> jobsQuery;
1598     if (type == null && status == null) {
1599       jobsQuery = namedQuery.findAll("Job.all", JpaJob.class);
1600     } else if (type == null) {
1601       jobsQuery = namedQuery.findAll(
1602           "Job.status",
1603           JpaJob.class,
1604           Pair.of("status", status.ordinal())
1605       );
1606     } else if (status == null) {
1607       jobsQuery = namedQuery.findAll(
1608           "Job.type",
1609           JpaJob.class,
1610           Pair.of("serviceType", type)
1611       );
1612     } else {
1613       jobsQuery = namedQuery.findAll(
1614           "Job",
1615           JpaJob.class,
1616           Pair.of("status", status.ordinal()),
1617           Pair.of("serviceType", type)
1618       );
1619     }
1620 
1621     try {
1622       return db.exec(jobsQuery).stream()
1623           .peek(this::setJobUri)
1624           .map(JpaJob::toJob)
1625           .collect(Collectors.toList());
1626     } catch (Exception e) {
1627       throw new ServiceRegistryException(e);
1628     }
1629   }
1630 
1631   @Override
1632   public List<String> getJobPayloads(String operation) throws ServiceRegistryException {
1633     try {
1634       return db.exec(namedQuery.findAll(
1635           "Job.payload",
1636           String.class,
1637           Pair.of("operation", operation)
1638       ));
1639     } catch (Exception e) {
1640       throw new ServiceRegistryException(e);
1641     }
1642   }
1643 
1644   @Override
1645   public List<String> getJobPayloads(String operation, int limit, int offset) throws ServiceRegistryException {
1646     try {
1647       return db.exec(em -> {
1648         return em.createNamedQuery("Job.payload", String.class)
1649             .setParameter("operation", operation)
1650             .setMaxResults(limit)
1651             .setFirstResult(offset)
1652             .getResultList();
1653       });
1654     } catch (Exception e) {
1655       throw new ServiceRegistryException(e);
1656     }
1657   }
1658 
1659   @Override
1660   public int getJobCount(final String operation) throws ServiceRegistryException {
1661     try {
1662       return db.exec(namedQuery.find(
1663           "Job.countByOperationOnly",
1664           Number.class,
1665           Pair.of("operation", operation)
1666       )).intValue();
1667     } catch (Exception e) {
1668       throw new ServiceRegistryException(e);
1669     }
1670   }
1671 
1672   /**
1673    * {@inheritDoc}
1674    *
1675    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getActiveJobs()
1676    */
1677   @Override
1678   public List<Job> getActiveJobs() throws ServiceRegistryException {
1679     try {
1680       return db.exec(getJobsByStatusQuery(activeJobStatus)).stream()
1681           .map(JpaJob::toJob)
1682           .collect(Collectors.toList());
1683     } catch (Exception e) {
1684       throw new ServiceRegistryException(e);
1685     }
1686   }
1687 
1688   /**
1689    * Get the list of jobs with status from the given statuses.
1690    *
1691    * @param statuses
1692    *          variable sized array of status values to test on jobs
1693    * @return list of jobs with status from statuses
1694    */
1695   public Function<EntityManager, List<JpaJob>> getJobsByStatusQuery(Status... statuses) {
1696     if (statuses == null || statuses.length < 1) {
1697       throw new IllegalArgumentException("At least one job status must be given.");
1698     }
1699 
1700     return namedQuery.findAll(
1701         "Job.statuses",
1702         JpaJob.class,
1703         Pair.of("statuses", Arrays.stream(statuses).map(Enum::ordinal).collect(Collectors.toList()))
1704     ).andThen(jobs -> jobs.stream()
1705         .peek(this::setJobUri)
1706         .collect(Collectors.toList()));
1707   }
1708 
1709   @Override
1710   public Map<String, Map<String, Long>> countActiveByOrganizationAndHost() {
1711     var rows = db.exec(namedQuery.findAll(
1712         "Job.countByOrganizationAndHost",
1713         Object[].class,
1714         Pair.of("statuses", Arrays.stream(activeJobStatus).map(Enum::ordinal).collect(Collectors.toList()))
1715     )).stream().collect(Collectors.toList());
1716     var orgMap = new HashMap<String, Map<String, Long>>();
1717     for (Object[] row: rows) {
1718       var org = (String) row[0];
1719       var host = (String) row[1];
1720       var count = (Long) row[2];
1721       orgMap.computeIfAbsent(org, o -> new HashMap<>()).put(host, count);
1722     }
1723     return orgMap;
1724   }
1725 
1726   @Override
1727   public Map<String, Long> countActiveTypeByOrganization(final String operation) {
1728     return db.exec(namedQuery.findAll(
1729         "Job.countTypeByOrganization",
1730         Object[].class,
1731         Pair.of("statuses", Arrays.stream(activeJobStatus).map(Enum::ordinal).collect(Collectors.toList())),
1732         Pair.of("operation", operation)
1733     )).stream().collect(Collectors.toMap(
1734         row -> (String) row[0],
1735         row -> (Long) row[1]
1736     ));
1737   }
1738 
1739   /**
1740    * Gets jobs of all types that are in the given state.
1741    *
1742    * @param offset apply offset to the db query if offset &gt; 0
1743    * @param limit apply limit to the db query if limit &gt; 0
1744    * @param statuses the job status should be one from the given statuses
1745    * @return the list of jobs waiting for dispatch
1746    */
1747   protected Function<EntityManager, List<JpaJob>> getDispatchableJobsWithStatusQuery(int offset, int limit,
1748       Status... statuses) {
1749     return em -> {
1750       if (statuses == null) {
1751         return Collections.emptyList();
1752       }
1753 
1754       TypedQuery<JpaJob> query = em
1755           .createNamedQuery("Job.dispatchable.status", JpaJob.class)
1756           .setParameter("statuses", Arrays.stream(statuses).map(Enum::ordinal).collect(Collectors.toList()));
1757       if (offset > 0) {
1758         query.setFirstResult(offset);
1759       }
1760       if (limit > 0) {
1761         query.setMaxResults(limit);
1762       }
1763       return query.getResultList();
1764     };
1765   }
1766 
1767   Function<EntityManager, List<Object[]>> getAvgOperationsQuery() {
1768     return namedQuery.findAll("Job.avgOperation", Object[].class);
1769   }
1770 
1771   Function<EntityManager, List<Object[]>> getCountPerHostServiceQuery() {
1772     return namedQuery.findAll("Job.countPerHostService", Object[].class);
1773   }
1774 
1775   /**
1776    * {@inheritDoc}
1777    *
1778    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#count(java.lang.String, Status)
1779    */
1780   @Override
1781   public long count(String serviceType, Status status) throws ServiceRegistryException {
1782     Function<EntityManager, Number> countQuery;
1783     if (serviceType == null && status == null) {
1784       countQuery = namedQuery.find(
1785           "Job.count.all",
1786           Number.class
1787       );
1788     } else if (serviceType == null) {
1789       countQuery = namedQuery.find(
1790           "Job.count.nullType",
1791           Number.class,
1792           Pair.of("status", status.ordinal())
1793       );
1794     } else if (status == null) {
1795       countQuery = namedQuery.find(
1796           "Job.count.nullStatus",
1797           Number.class,
1798           Pair.of("serviceType", serviceType)
1799       );
1800     } else {
1801       countQuery = namedQuery.find(
1802           "Job.count",
1803           Number.class,
1804           Pair.of("status", status.ordinal()),
1805           Pair.of("serviceType", serviceType)
1806       );
1807     }
1808 
1809     try {
1810       return db.exec(countQuery).longValue();
1811     } catch (Exception e) {
1812       throw new ServiceRegistryException(e);
1813     }
1814   }
1815 
1816   /**
1817    * {@inheritDoc}
1818    *
1819    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#countByHost(java.lang.String, java.lang.String,
1820    *      Status)
1821    */
1822   @Override
1823   public long countByHost(String serviceType, String host, Status status) throws ServiceRegistryException {
1824     Function<EntityManager, Number> countQuery;
1825     if (serviceType != null && !serviceType.isEmpty()) {
1826       countQuery = namedQuery.find(
1827           "Job.countByHost",
1828           Number.class,
1829           Pair.of("serviceType", serviceType),
1830           Pair.of("status", status.ordinal()),
1831           Pair.of("host", host)
1832       );
1833     } else {
1834       countQuery = namedQuery.find(
1835           "Job.countByHost.nullType",
1836           Number.class,
1837           Pair.of("status", status.ordinal()),
1838           Pair.of("host", host)
1839       );
1840     }
1841 
1842     try {
1843       return db.exec(countQuery).longValue();
1844     } catch (Exception e) {
1845       throw new ServiceRegistryException(e);
1846     }
1847   }
1848 
1849   /**
1850    * {@inheritDoc}
1851    *
1852    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#countByOperation(java.lang.String, java.lang.String,
1853    *      Status)
1854    */
1855 
1856   @Override
1857   public long countByOperation(String serviceType, String operation, Status status) throws ServiceRegistryException {
1858     try {
1859       return db.exec(namedQuery.find(
1860           "Job.countByOperation",
1861           Number.class,
1862           Pair.of("status", status.ordinal()),
1863           Pair.of("serviceType", serviceType),
1864           Pair.of("operation", operation)
1865       )).longValue();
1866     } catch (Exception e) {
1867       throw new ServiceRegistryException(e);
1868     }
1869   }
1870 
1871   /**
1872    * {@inheritDoc}
1873    *
1874    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#count(java.lang.String, java.lang.String,
1875    *      java.lang.String, Status)
1876    */
1877   @Override
1878   public long count(String serviceType, String host, String operation, Status status) throws ServiceRegistryException {
1879     if (StringUtils.isBlank(serviceType) || StringUtils.isBlank(host) || StringUtils.isBlank(operation)
1880             || status == null) {
1881       throw new IllegalArgumentException("service type, host, operation, and status must be provided");
1882     }
1883 
1884     try {
1885       return db.exec(namedQuery.find(
1886           "Job.fullMonty",
1887           Number.class,
1888           Pair.of("status", status.ordinal()),
1889           Pair.of("serviceType", serviceType),
1890           Pair.of("operation", operation)
1891       )).longValue();
1892     } catch (Exception e) {
1893       throw new ServiceRegistryException(e);
1894     }
1895   }
1896 
1897   /**
1898    * {@inheritDoc}
1899    *
1900    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceStatistics()
1901    */
1902   @Override
1903   public List<ServiceStatistics> getServiceStatistics() throws ServiceRegistryException {
1904     Date now = new Date();
1905     try {
1906       return db.exec(getServiceStatisticsQuery(
1907           DateUtils.addDays(now, -maxJobAge),
1908           DateUtils.addDays(now, 1) // Avoid glitches around 'now' by setting the endDate to 'tomorrow'
1909       ));
1910     } catch (Exception e) {
1911       throw new ServiceRegistryException(e);
1912     }
1913   }
1914 
1915   /**
1916    * Gets performance and runtime statistics for each known service registration.
1917    * For the statistics, only jobs created within the time interval [startDate, endDate] are being considered
1918    *
1919    * @param startDate
1920    *          Only jobs created after this data are considered for statistics
1921    * @param endDate
1922    *          Only jobs created before this data are considered for statistics
1923    * @return the service statistics
1924    */
1925   private Function<EntityManager, List<ServiceStatistics>> getServiceStatisticsQuery(Date startDate, Date endDate) {
1926     return em -> {
1927       Map<Long, JaxbServiceStatistics> statsMap = new HashMap<>();
1928 
1929       // Make sure we also include the services that have no processing history so far
1930       namedQuery.findAll("ServiceRegistration.getAll", ServiceRegistrationJpaImpl.class).apply(em).forEach(s ->
1931         statsMap.put(s.getId(), new JaxbServiceStatistics(s))
1932       );
1933 
1934       if (collectJobstats) {
1935         // Build stats map
1936         namedQuery.findAll(
1937             "ServiceRegistration.statistics",
1938             Object[].class,
1939             Pair.of("minDateCreated", startDate),
1940             Pair.of("maxDateCreated", endDate)
1941         ).apply(em).forEach(row -> {
1942           Number serviceRegistrationId = (Number) row[0];
1943           if (serviceRegistrationId == null || serviceRegistrationId.longValue() == 0) {
1944             return;
1945           }
1946           Status status = Status.values()[((Number) row[1]).intValue()];
1947           Number count = (Number) row[2];
1948           Number meanQueueTime = (Number) row[3];
1949           Number meanRunTime = (Number) row[4];
1950 
1951           // The statistics query returns a cartesian product, so we need to iterate over them to build up the objects
1952           JaxbServiceStatistics stats = statsMap.get(serviceRegistrationId.longValue());
1953           if (stats == null) {
1954             return;
1955           }
1956 
1957           // the status will be null if there are no jobs at all associated with this service registration
1958           if (status != null) {
1959             switch (status) {
1960               case RUNNING:
1961                 stats.setRunningJobs(count.intValue());
1962                 break;
1963               case QUEUED:
1964               case DISPATCHING:
1965                 stats.setQueuedJobs(count.intValue());
1966                 break;
1967               case FINISHED:
1968                 stats.setMeanRunTime(meanRunTime.longValue());
1969                 stats.setMeanQueueTime(meanQueueTime.longValue());
1970                 stats.setFinishedJobs(count.intValue());
1971                 break;
1972               default:
1973                 break;
1974             }
1975           }
1976         });
1977       }
1978 
1979       List<ServiceStatistics> stats = new ArrayList<>(statsMap.values());
1980       stats.sort((o1, o2) -> {
1981         ServiceRegistration reg1 = o1.getServiceRegistration();
1982         ServiceRegistration reg2 = o2.getServiceRegistration();
1983         int typeComparison = reg1.getServiceType().compareTo(reg2.getServiceType());
1984         return typeComparison == 0
1985             ? reg1.getHost().compareTo(reg2.getHost())
1986             : typeComparison;
1987       });
1988 
1989       return stats;
1990     };
1991   }
1992 
1993   /**
1994    * Do not look at this, it will burn your eyes! This is due to JPA's inability to do a left outer join with join
1995    * conditions.
1996    *
1997    * {@inheritDoc}
1998    *
1999    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrationsByLoad(java.lang.String)
2000    */
2001   @Override
2002   public List<ServiceRegistration> getServiceRegistrationsByLoad(String serviceType) throws ServiceRegistryException {
2003     SystemLoad loadByHost = getCurrentHostLoads();
2004     List<HostRegistration> hostRegistrations = getHostRegistrations();
2005     List<ServiceRegistration> serviceRegistrations = getServiceRegistrationsByType(serviceType);
2006     return getServiceRegistrationsByLoad(serviceType, serviceRegistrations, hostRegistrations, loadByHost);
2007   }
2008 
2009   /**
2010    * {@inheritDoc}
2011    *
2012    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getCurrentHostLoads()
2013    */
2014   @Override
2015   public SystemLoad getCurrentHostLoads() {
2016     return db.exec(getHostLoadsQuery());
2017   }
2018 
2019   /**
2020    * Gets a map of hosts to the number of jobs currently loading that host
2021    *
2022    * @return the map of hosts to job counts
2023    */
2024   Function<EntityManager, SystemLoad> getHostLoadsQuery() {
2025     return em -> {
2026       final SystemLoad systemLoad = new SystemLoad();
2027 
2028       // Find all jobs that are currently running on any given host, or get all of them
2029       List<Integer> statuses = JOB_STATUSES_INFLUENCING_LOAD_BALANCING.stream()
2030           .map(Enum::ordinal)
2031           .collect(Collectors.toList());
2032       List<Object[]> rows = namedQuery.findAll(
2033           "ServiceRegistration.hostloads",
2034           Object[].class,
2035           Pair.of("statuses", statuses),
2036           // Note: This is used in the query to filter out workflow jobs.
2037           // These jobs are load balanced by the workflow service directly.
2038           Pair.of("workflow_type", TYPE_WORKFLOW)
2039       ).apply(em);
2040 
2041       // Accumulate the numbers for relevant job statuses per host
2042       for (Object[] row : rows) {
2043         String host = String.valueOf(row[0]);
2044         Status status = Status.values()[(int) row[1]];
2045         float currentLoad = ((Number) row[2]).floatValue();
2046         float maxLoad = ((Number) row[3]).floatValue();
2047 
2048         // Only queued, and running jobs are adding to the load, so every other status is discarded
2049         if (status == null || !JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(status)) {
2050           currentLoad = 0.0f;
2051         }
2052         // Add the service registration
2053         NodeLoad serviceLoad = new NodeLoad(host, currentLoad, maxLoad);
2054         systemLoad.addNodeLoad(serviceLoad);
2055       }
2056 
2057       // This is important, otherwise services which have no current load are not listed in the output!
2058       getHostRegistrationsQuery().apply(em).stream()
2059           .filter(h -> !systemLoad.containsHost(h.getBaseUrl()))
2060           .forEach(h -> systemLoad.addNodeLoad(new NodeLoad(h.getBaseUrl(), 0.0f, h.getMaxLoad())));
2061       return systemLoad;
2062     };
2063   }
2064 
2065   /**
2066    * {@inheritDoc}
2067    *
2068    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrationsByType(java.lang.String)
2069    */
2070   @Override
2071   public List<ServiceRegistration> getServiceRegistrationsByType(String serviceType) throws ServiceRegistryException {
2072     return db.exec(namedQuery.findAll(
2073         "ServiceRegistration.getByType",
2074         ServiceRegistration.class,
2075         Pair.of("serviceType", serviceType)
2076     ));
2077   }
2078 
2079   /**
2080    * {@inheritDoc}
2081    *
2082    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrationsByHost(java.lang.String)
2083    */
2084   @Override
2085   public List<ServiceRegistration> getServiceRegistrationsByHost(String host) throws ServiceRegistryException {
2086     return db.exec(getServiceRegistrationsByHostQuery(host));
2087   }
2088 
2089   private Function<EntityManager, List<ServiceRegistration>> getServiceRegistrationsByHostQuery(String host) {
2090     return namedQuery.findAll(
2091         "ServiceRegistration.getByHost",
2092         ServiceRegistration.class,
2093         Pair.of("host", host)
2094     );
2095   }
2096 
2097   /**
2098    * {@inheritDoc}
2099    *
2100    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistration(java.lang.String,
2101    *      java.lang.String)
2102    */
2103   @Override
2104   public ServiceRegistration getServiceRegistration(String serviceType, String host) {
2105     return db.exec(getServiceRegistrationQuery(serviceType, host))
2106         .orElse(null);
2107   }
2108 
2109   /**
2110    * Sets the trusted http client.
2111    *
2112    * @param client
2113    *          the trusted http client
2114    */
2115   @Reference
2116   void setTrustedHttpClient(TrustedHttpClient client) {
2117     this.client = client;
2118   }
2119 
2120   /**
2121    * Callback for setting the security service.
2122    *
2123    * @param securityService
2124    *          the securityService to set
2125    */
2126   @Reference
2127   public void setSecurityService(SecurityService securityService) {
2128     this.securityService = securityService;
2129   }
2130 
2131   /** OSGi DI. */
2132   @Reference(cardinality = ReferenceCardinality.OPTIONAL, policy =  ReferencePolicy.DYNAMIC, unbind = "unsetIncidentService")
2133   public void setIncidentService(IncidentService incidentService) {
2134     this.incidentService = incidentService;
2135     // Manually resolve the cyclic dependency between the incident service and the service registry
2136     ((OsgiIncidentService) incidentService).setServiceRegistry(this);
2137     this.incidents = new Incidents(this, incidentService);
2138   }
2139 
2140   public void unsetIncidentService(IncidentService incidentService) {
2141     if (this.incidentService == incidentService) {
2142       this.incidentService = null;
2143       this.incidents = null;
2144     }
2145   }
2146 
2147   /**
2148    * Update the jobs failure history and the service status with the given information. All these data are then use for
2149    * the jobs failover strategy. Only the terminated job (with FAILED or FINISHED status) are taken into account.
2150    *
2151    * @param job
2152    *          the current job that failed/succeeded
2153    * @throws ServiceRegistryException
2154    * @throws NotFoundException
2155    */
2156   private void updateServiceForFailover(JpaJob job) throws ServiceRegistryException, NotFoundException {
2157     if (job.getStatus() != Status.FAILED && job.getStatus() != Status.FINISHED) {
2158       return;
2159     }
2160 
2161     job.setStatus(job.getStatus(), job.getFailureReason());
2162 
2163     // At this point, the only possible states for the current service are NORMAL and WARNING,
2164     // the services in ERROR state will not be chosen by the dispatcher
2165     ServiceRegistrationJpaImpl currentService = job.getProcessorServiceRegistration();
2166     if (currentService == null) {
2167       return;
2168     }
2169 
2170     // Job is finished with a failure
2171     if (job.getStatus() == FAILED && !DATA.equals(job.getFailureReason())) {
2172 
2173       // Services in WARNING or ERROR state triggered by current job
2174       List<ServiceRegistrationJpaImpl> relatedWarningOrErrorServices = getRelatedWarningErrorServices(job);
2175 
2176       // Before this job failed there was at least one job failed with this job signature on any service
2177       if (relatedWarningOrErrorServices.size() > 0) {
2178         for (ServiceRegistrationJpaImpl relatedService : relatedWarningOrErrorServices) {
2179           // Skip current service from the list
2180           if (currentService.equals(relatedService)) {
2181             continue;
2182           }
2183 
2184           // De-escalate the state of related services as the issue is most likely with the job not the service
2185           // Reset the WARNING job to NORMAL
2186           if (relatedService.getServiceState() == WARNING) {
2187             logger.info("State reset to NORMAL for related service {} on host {}", relatedService.getServiceType(),
2188                     relatedService.getHost());
2189             relatedService.setServiceState(NORMAL, job.toJob().getSignature());
2190           }
2191 
2192           // Reset the ERROR job to WARNING
2193           else if (relatedService.getServiceState() == ERROR) {
2194             logger.info("State reset to WARNING for related service {} on host {}", relatedService.getServiceType(),
2195                     relatedService.getHost());
2196             relatedService.setServiceState(WARNING, relatedService.getWarningStateTrigger());
2197           }
2198 
2199           updateServiceState(relatedService);
2200         }
2201       }
2202 
2203       // This is the first job with this signature failing on any service
2204       else {
2205         // Set the current service to WARNING state
2206         if (currentService.getServiceState() == NORMAL) {
2207           logger.info("State set to WARNING for current service {} on host {}", currentService.getServiceType(),
2208                   currentService.getHost());
2209           currentService.setServiceState(WARNING, job.toJob().getSignature());
2210           updateServiceState(currentService);
2211         }
2212 
2213         // The current service already is in WARNING state and max attempts is reached
2214         else if (errorStatesEnabled && !noErrorStateServiceTypes.contains(currentService.getServiceType())
2215                 && getHistorySize(currentService) >= maxAttemptsBeforeErrorState) {
2216           logger.info("State set to ERROR for current service {} on host {}", currentService.getServiceType(),
2217                   currentService.getHost());
2218           currentService.setServiceState(ERROR, job.toJob().getSignature());
2219           updateServiceState(currentService);
2220         }
2221       }
2222     }
2223 
2224     // Job is finished without failure
2225     else if (job.getStatus() == Status.FINISHED) {
2226       // If the service was in warning state reset to normal state
2227       if (currentService.getServiceState() == WARNING) {
2228         logger.info("State reset to NORMAL for current service {} on host {}", currentService.getServiceType(),
2229                 currentService.getHost());
2230         currentService.setServiceState(NORMAL);
2231         updateServiceState(currentService);
2232       }
2233     }
2234   }
2235 
2236   /**
2237    * {@inheritDoc}
2238    *
2239    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#sanitize(java.lang.String, java.lang.String)
2240    */
2241   @Override
2242   public void sanitize(String serviceType, String host) throws NotFoundException {
2243     db.execChecked(em -> {
2244       ServiceRegistrationJpaImpl service = getServiceRegistrationQuery(serviceType, host).apply(em)
2245           .orElseThrow(NotFoundException::new);
2246 
2247       logger.info("State reset to NORMAL for service {} on host {} through sanitize method", service.getServiceType(),
2248           service.getHost());
2249       service.setServiceState(NORMAL);
2250       updateServiceState(service);
2251     });
2252   }
2253 
2254   /**
2255    * Gets the failed jobs history for the given service registration
2256    *
2257    * @param serviceRegistration
2258    * @return the failed jobs history size
2259    * @throws IllegalArgumentException
2260    *           if parameter is null
2261    * @throws ServiceRegistryException
2262    */
2263   private int getHistorySize(ServiceRegistration serviceRegistration) throws ServiceRegistryException {
2264     if (serviceRegistration == null) {
2265       throw new IllegalArgumentException("serviceRegistration must not be null!");
2266     }
2267 
2268     logger.debug("Calculating count of jobs who failed due to service {}", serviceRegistration);
2269 
2270     try {
2271       return db.exec(namedQuery.find(
2272           "Job.count.history.failed",
2273           Number.class,
2274           Pair.of("serviceType", serviceRegistration.getServiceType()),
2275           Pair.of("host", serviceRegistration.getHost())
2276       )).intValue();
2277     } catch (Exception e) {
2278       throw new ServiceRegistryException(e);
2279     }
2280   }
2281 
2282   /**
2283    * Gets the services in WARNING or ERROR state triggered by this job
2284    *
2285    * @param job
2286    *          the given job to get the related services
2287    * @return a list of services triggered by the job
2288    * @throws IllegalArgumentException
2289    *           if the given job was null
2290    * @throws ServiceRegistryException
2291    *           if the there was a problem with the query
2292    */
2293   private List<ServiceRegistrationJpaImpl> getRelatedWarningErrorServices(JpaJob job) throws ServiceRegistryException {
2294     if (job == null) {
2295       throw new IllegalArgumentException("job must not be null!");
2296     }
2297 
2298     logger.debug("Try to get the services in WARNING or ERROR state triggered by {} failed", job);
2299 
2300     try {
2301       return db.exec(namedQuery.findAll(
2302           "ServiceRegistration.relatedservices.warning_error",
2303           ServiceRegistrationJpaImpl.class,
2304           Pair.of("serviceType", job.getJobType())
2305       )).stream()
2306           // TODO: modify the query to avoid to go through the list here
2307           .filter(rs ->
2308               (rs.getServiceState() == WARNING && rs.getWarningStateTrigger() == job.toJob().getSignature())
2309               || (rs.getServiceState() == ERROR && rs.getErrorStateTrigger() == job.toJob().getSignature())
2310           ).collect(Collectors.toList());
2311     } catch (Exception e) {
2312       throw new ServiceRegistryException(e);
2313     }
2314   }
2315 
2316   /**
2317    * Returns a filtered list of service registrations, containing only those that are online, not in maintenance mode,
2318    * and with a specific service type that are running on a host which is not already maxed out.
2319    *
2320    * @param serviceRegistrations
2321    *          the complete list of service registrations
2322    * @param hostRegistrations
2323    *          the complete list of available host registrations
2324    * @param systemLoad
2325    *          the map of hosts to the number of running jobs
2326    * @param jobType
2327    *          the job type for which the services registrations are filtered
2328    */
2329   protected List<ServiceRegistration> getServiceRegistrationsWithCapacity(String jobType,
2330           List<ServiceRegistration> serviceRegistrations, List<HostRegistration> hostRegistrations,
2331           final SystemLoad systemLoad) {
2332     final List<String> hostBaseUrls = hostRegistrations.stream()
2333                                                        .map(HostRegistration::getBaseUrl)
2334                                                        .collect(Collectors.toUnmodifiableList());
2335     final List<ServiceRegistration> filteredList = new ArrayList<>();
2336 
2337     for (ServiceRegistration service : serviceRegistrations) {
2338       // Skip service if host not available
2339       if (!hostBaseUrls.contains(service.getHost())) {
2340         logger.trace("Not considering {} because it's host {} is not available for dispatching", service,
2341                 service.getHost());
2342         continue;
2343       }
2344 
2345       // Skip services that are not of the requested type
2346       if (!jobType.equals(service.getServiceType())) {
2347         logger.trace("Not considering {} because it is of the wrong job type", service);
2348         continue;
2349       }
2350 
2351       // Skip services that are in error state
2352       if (service.getServiceState() == ERROR) {
2353         logger.trace("Not considering {} because it is in error state", service);
2354         continue;
2355       }
2356 
2357       // Skip services that are in maintenance mode
2358       if (service.isInMaintenanceMode()) {
2359         logger.trace("Not considering {} because it is in maintenance mode", service);
2360         continue;
2361       }
2362 
2363       // Skip services that are marked as offline
2364       if (!service.isOnline()) {
2365         logger.trace("Not considering {} because it is currently offline", service);
2366         continue;
2367       }
2368 
2369       // Determine the maximum load for this host
2370       Float hostLoadMax = null;
2371       for (HostRegistration host : hostRegistrations) {
2372         if (host.getBaseUrl().equals(service.getHost())) {
2373           hostLoadMax = host.getMaxLoad();
2374           break;
2375         }
2376       }
2377       if (hostLoadMax == null) {
2378         logger.warn("Unable to determine max load for host {}", service.getHost());
2379       }
2380 
2381       // Determine the current load for this host
2382       Float hostLoad = systemLoad.get(service.getHost()).getLoadFactor();
2383       if (hostLoad == null) {
2384         logger.warn("Unable to determine current load for host {}", service.getHost());
2385       }
2386 
2387       // Is this host suited for processing?
2388       if (hostLoad == null || hostLoadMax == null || hostLoad < hostLoadMax) {
2389         logger.debug("Adding candidate service {} for processing of jobs of type '{}' (host load is {} of max {})",
2390            service, jobType, hostLoad, hostLoadMax);
2391         filteredList.add(service);
2392       }
2393     }
2394 
2395     // Sort the list by capacity
2396     filteredList.sort(new LoadComparator(systemLoad));
2397 
2398     return filteredList;
2399   }
2400 
2401   /**
2402    * Returns a filtered list of service registrations, containing only those that are online, not in maintenance mode,
2403    * and with a specific service type, ordered by load.
2404    *
2405    * @param jobType
2406    *          the job type for which the services registrations are filtered
2407    * @param serviceRegistrations
2408    *          the complete list of service registrations
2409    * @param hostRegistrations
2410    *          the complete list of available host registrations
2411    * @param systemLoad
2412    *
2413    */
2414   protected List<ServiceRegistration> getServiceRegistrationsByLoad(String jobType,
2415           List<ServiceRegistration> serviceRegistrations, List<HostRegistration> hostRegistrations,
2416           final SystemLoad systemLoad) {
2417     final List<String> hostBaseUrls = hostRegistrations.stream()
2418                                                        .map(HostRegistration::getBaseUrl)
2419                                                        .collect(Collectors.toUnmodifiableList());
2420     final List<ServiceRegistration> filteredList = new ArrayList<>();
2421 
2422     logger.debug("Finding services to dispatch job of type {}", jobType);
2423 
2424     for (ServiceRegistration service : serviceRegistrations) {
2425       // Skip service if host not available
2426       if (!hostBaseUrls.contains(service.getHost())) {
2427         logger.trace("Not considering {} because it's host {} is not available for dispatching", service,
2428                 service.getHost());
2429         continue;
2430       }
2431 
2432       // Skip services that are not of the requested type
2433       if (!jobType.equals(service.getServiceType())) {
2434         logger.trace("Not considering {} because it is of the wrong job type", service);
2435         continue;
2436       }
2437 
2438       // Skip services that are in error state
2439       if (service.getServiceState() == ERROR) {
2440         logger.trace("Not considering {} because it is in error state", service);
2441         continue;
2442       }
2443 
2444       // Skip services that are in maintenance mode
2445       if (service.isInMaintenanceMode()) {
2446         logger.trace("Not considering {} because it is in maintenance mode", service);
2447         continue;
2448       }
2449 
2450       // Skip services that are marked as offline
2451       if (!service.isOnline()) {
2452         logger.trace("Not considering {} because it is currently offline", service);
2453         continue;
2454       }
2455 
2456       // We found a candidate service
2457       logger.debug("Adding candidate service {} for processing of job of type '{}'", service, jobType);
2458       filteredList.add(service);
2459     }
2460 
2461     // Sort the list by capacity and distinguish between composer jobs and other jobs
2462     if ("org.opencastproject.composer".equals(jobType))
2463       Collections.sort(filteredList, new LoadComparatorEncoding(systemLoad));
2464     else
2465       Collections.sort(filteredList, new LoadComparator(systemLoad));
2466 
2467     return filteredList;
2468   }
2469 
2470   /**
2471    * {@inheritDoc}
2472    *
2473    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getMaxLoads()
2474    */
2475   @Override
2476   public SystemLoad getMaxLoads() throws ServiceRegistryException {
2477     final SystemLoad loads = new SystemLoad();
2478     getHostRegistrations().stream()
2479         .map(host -> new NodeLoad(host.getBaseUrl(), 0.0f, host.getMaxLoad()))
2480         .forEach(loads::addNodeLoad);
2481     return loads;
2482   }
2483 
2484   /**
2485    * {@inheritDoc}
2486    *
2487    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getMaxLoadOnNode(java.lang.String)
2488    */
2489   @Override
2490   public NodeLoad getMaxLoadOnNode(String host) throws ServiceRegistryException, NotFoundException {
2491     try {
2492       float maxLoad = db.exec(namedQuery.find(
2493           "HostRegistration.getMaxLoadByHostName",
2494           Number.class,
2495           Pair.of("host", host)
2496       )).floatValue();
2497       return new NodeLoad(host, 0.0f, maxLoad);
2498     } catch (NoResultException e) {
2499       throw new NotFoundException(e);
2500     } catch (Exception e) {
2501       throw new ServiceRegistryException(e);
2502     }
2503   }
2504 
2505   /** A periodic check on each service registration to ensure that it is still alive. */
2506   class JobProducerHeartbeat implements Runnable {
2507 
2508     /** List of service registrations that have been found unresponsive last time we checked */
2509     private final List<ServiceRegistration> unresponsive = new ArrayList<>();
2510 
2511     /**
2512      * {@inheritDoc}
2513      *
2514      * @see java.lang.Runnable#run()
2515      */
2516     @Override
2517     public void run() {
2518       logger.debug("Checking for unresponsive services");
2519 
2520       try {
2521         List<ServiceRegistration> serviceRegistrations = getOnlineServiceRegistrations();
2522 
2523         for (ServiceRegistration service : serviceRegistrations) {
2524           hostsStatistics.updateHost(((ServiceRegistrationJpaImpl) service).getHostRegistration());
2525           servicesStatistics.updateService(service);
2526           if (!service.isJobProducer()) {
2527             continue;
2528           }
2529           if (service.isInMaintenanceMode()) {
2530             continue;
2531           }
2532 
2533           // We think this service is online and available. Prove it.
2534           String serviceUrl = UrlSupport.concat(service.getHost(), service.getPath(), "dispatch");
2535 
2536           HttpHead options = new HttpHead(serviceUrl);
2537           HttpResponse response = null;
2538           try {
2539             try {
2540               response = client.execute(options);
2541               if (response != null) {
2542                 switch (response.getStatusLine().getStatusCode()) {
2543                   case HttpStatus.SC_OK:
2544                     // this service is reachable, continue checking other services
2545                     logger.trace("Service " + service + " is responsive: " + response.getStatusLine());
2546                     if (unresponsive.remove(service)) {
2547                       logger.info("Service {} is still online", service);
2548                     } else if (!service.isOnline()) {
2549                       try {
2550                         setOnlineStatus(service.getServiceType(), service.getHost(), service.getPath(), true, true);
2551                         logger.info("Service {} is back online", service);
2552                       } catch (ServiceRegistryException e) {
2553                         logger.warn("Error setting online status for {}", service);
2554                       }
2555                     }
2556                     continue;
2557                   default:
2558                     if (!service.isOnline()) {
2559                       continue;
2560                     }
2561                     logger.warn("Service {} is not working as expected: {}", service, response.getStatusLine());
2562                 }
2563               } else {
2564                 logger.warn("Service {} does not respond", service);
2565               }
2566             } catch (TrustedHttpClientException e) {
2567               if (!service.isOnline()) {
2568                 continue;
2569               }
2570               logger.warn("Unable to reach {}", service, e);
2571             }
2572 
2573             // If we get here, the service did not respond as expected
2574             try {
2575               if (unresponsive.contains(service)) {
2576                 unRegisterService(service.getServiceType(), service.getHost());
2577                 unresponsive.remove(service);
2578                 logger.warn("Marking {} as offline", service);
2579               } else {
2580                 unresponsive.add(service);
2581                 logger.warn("Added {} to the watch list", service);
2582               }
2583             } catch (ServiceRegistryException e) {
2584               logger.warn("Unable to unregister unreachable service: {}", service, e);
2585             }
2586           } finally {
2587             client.close(response);
2588           }
2589         }
2590       } catch (Throwable t) {
2591         logger.warn("Error while checking for unresponsive services", t);
2592       }
2593 
2594       logger.debug("Finished checking for unresponsive services");
2595     }
2596   }
2597 
2598   /**
2599    * Comparator that will sort service registrations depending on their capacity, wich is defined by the number of jobs
2600    * the service's host is already running divided by the MaxLoad of the Server. The lower that number, the bigger the capacity.
2601    */
2602   private class LoadComparator implements Comparator<ServiceRegistration> {
2603 
2604     protected SystemLoad loadByHost = null;
2605 
2606     /**
2607      * Creates a new comparator which is using the given map of host names and loads.
2608      *
2609      * @param loadByHost
2610      *          the current work load by host
2611      */
2612     LoadComparator(SystemLoad loadByHost) {
2613       this.loadByHost = loadByHost;
2614     }
2615 
2616     @Override
2617     public int compare(ServiceRegistration serviceA, ServiceRegistration serviceB) {
2618       String hostA = serviceA.getHost();
2619       String hostB = serviceB.getHost();
2620       NodeLoad nodeA = loadByHost.get(hostA);
2621       NodeLoad nodeB = loadByHost.get(hostB);
2622       // If the load factors are about the same, sort based on maximum load
2623       if (Math.abs(nodeA.getLoadFactor() - nodeB.getLoadFactor()) <= 0.01) {
2624         // NOTE: The sort order below is *reversed* from what you'd expect
2625         // When we're comparing the load factors we want the node with the lowest factor to be first
2626         // When we're comparing the maximum load value, we want the node with the highest max to be first
2627         return Float.compare(nodeB.getMaxLoad(), nodeA.getMaxLoad());
2628       }
2629       return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2630     }
2631   }
2632 
2633   /**
2634    * Comparator that will sort service registrations depending on their capacity, which is defined by the number of jobs
2635    * the service's host is already running divided by the MaxLoad of the Server. The lower that number, the bigger the capacity.
2636    * This Comparator will preferre encoding workers, if none are defined in the configuration file it will act like the LoadComparator.
2637    */
2638   private class LoadComparatorEncoding extends LoadComparator implements Comparator<ServiceRegistration> {
2639 
2640     /**
2641      * Creates a new comparator which is using the given map of host names and loads.
2642      *
2643      * @param loadByHost
2644      */
2645     LoadComparatorEncoding(SystemLoad loadByHost) {
2646       super(loadByHost);
2647     }
2648 
2649     @Override
2650     public int compare(ServiceRegistration serviceA, ServiceRegistration serviceB) {
2651       String hostA = serviceA.getHost();
2652       String hostB = serviceB.getHost();
2653       NodeLoad nodeA = loadByHost.get(hostA);
2654       NodeLoad nodeB = loadByHost.get(hostB);
2655 
2656       if (encodingWorkers != null) {
2657         if (encodingWorkers.contains(hostA) && !encodingWorkers.contains(hostB)) {
2658           if (nodeA.getLoadFactor() <= encodingThreshold) {
2659             return -1;
2660           }
2661           return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2662         }
2663         if (encodingWorkers.contains(hostB) && !encodingWorkers.contains(hostA)) {
2664           if (nodeB.getLoadFactor() <= encodingThreshold) {
2665             return 1;
2666           }
2667           return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2668         }
2669       }
2670         return super.compare(serviceA, serviceB);
2671     }
2672   }
2673 }