View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.serviceregistry.impl;
23  
24  import static java.lang.String.format;
25  import static org.apache.commons.lang3.StringUtils.isBlank;
26  import static org.opencastproject.db.Queries.namedQuery;
27  import static org.opencastproject.job.api.AbstractJobProducer.ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY;
28  import static org.opencastproject.job.api.AbstractJobProducer.DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING;
29  import static org.opencastproject.job.api.Job.FailureReason.DATA;
30  import static org.opencastproject.job.api.Job.Status.FAILED;
31  import static org.opencastproject.serviceregistry.api.ServiceState.ERROR;
32  import static org.opencastproject.serviceregistry.api.ServiceState.NORMAL;
33  import static org.opencastproject.serviceregistry.api.ServiceState.WARNING;
34  import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
35  
36  import org.opencastproject.db.DBSession;
37  import org.opencastproject.db.DBSessionFactory;
38  import org.opencastproject.job.api.Job;
39  import org.opencastproject.job.api.Job.Status;
40  import org.opencastproject.job.jpa.JpaJob;
41  import org.opencastproject.security.api.Organization;
42  import org.opencastproject.security.api.SecurityService;
43  import org.opencastproject.security.api.TrustedHttpClient;
44  import org.opencastproject.security.api.TrustedHttpClientException;
45  import org.opencastproject.security.api.User;
46  import org.opencastproject.serviceregistry.api.HostRegistration;
47  import org.opencastproject.serviceregistry.api.HostStatistics;
48  import org.opencastproject.serviceregistry.api.IncidentService;
49  import org.opencastproject.serviceregistry.api.Incidents;
50  import org.opencastproject.serviceregistry.api.JaxbServiceStatistics;
51  import org.opencastproject.serviceregistry.api.ServiceRegistration;
52  import org.opencastproject.serviceregistry.api.ServiceRegistry;
53  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
54  import org.opencastproject.serviceregistry.api.ServiceStatistics;
55  import org.opencastproject.serviceregistry.api.SystemLoad;
56  import org.opencastproject.serviceregistry.api.SystemLoad.NodeLoad;
57  import org.opencastproject.serviceregistry.impl.jmx.HostsStatistics;
58  import org.opencastproject.serviceregistry.impl.jmx.JobsStatistics;
59  import org.opencastproject.serviceregistry.impl.jmx.ServicesStatistics;
60  import org.opencastproject.serviceregistry.impl.jpa.HostRegistrationJpaImpl;
61  import org.opencastproject.serviceregistry.impl.jpa.ServiceRegistrationJpaImpl;
62  import org.opencastproject.systems.OpencastConstants;
63  import org.opencastproject.util.NotFoundException;
64  import org.opencastproject.util.UrlSupport;
65  import org.opencastproject.util.function.ThrowingConsumer;
66  import org.opencastproject.util.jmx.JmxUtil;
67  
68  import org.apache.commons.lang3.StringUtils;
69  import org.apache.commons.lang3.time.DateUtils;
70  import org.apache.commons.lang3.tuple.Pair;
71  import org.apache.http.HttpResponse;
72  import org.apache.http.HttpStatus;
73  import org.apache.http.client.methods.HttpHead;
74  import org.osgi.service.cm.ConfigurationException;
75  import org.osgi.service.cm.ManagedService;
76  import org.osgi.service.component.ComponentContext;
77  import org.osgi.service.component.annotations.Activate;
78  import org.osgi.service.component.annotations.Component;
79  import org.osgi.service.component.annotations.Deactivate;
80  import org.osgi.service.component.annotations.Modified;
81  import org.osgi.service.component.annotations.Reference;
82  import org.osgi.service.component.annotations.ReferenceCardinality;
83  import org.osgi.service.component.annotations.ReferencePolicy;
84  import org.slf4j.Logger;
85  import org.slf4j.LoggerFactory;
86  
87  import java.net.InetAddress;
88  import java.net.URI;
89  import java.net.URISyntaxException;
90  import java.util.ArrayList;
91  import java.util.Arrays;
92  import java.util.Collections;
93  import java.util.Comparator;
94  import java.util.Date;
95  import java.util.Dictionary;
96  import java.util.HashMap;
97  import java.util.List;
98  import java.util.Map;
99  import java.util.Objects;
100 import java.util.Optional;
101 import java.util.concurrent.Executors;
102 import java.util.concurrent.ScheduledExecutorService;
103 import java.util.concurrent.TimeUnit;
104 import java.util.concurrent.atomic.AtomicReference;
105 import java.util.function.Consumer;
106 import java.util.function.Function;
107 import java.util.stream.Collectors;
108 
109 import javax.management.ObjectInstance;
110 import javax.persistence.EntityManager;
111 import javax.persistence.EntityManagerFactory;
112 import javax.persistence.LockModeType;
113 import javax.persistence.NoResultException;
114 import javax.persistence.TypedQuery;
115 
116 /** JPA implementation of the {@link ServiceRegistry} */
117 @Component(
118   property = {
119     "service.description=Service registry"
120   },
121   immediate = true,
122   service = { ManagedService.class, ServiceRegistry.class, ServiceRegistryJpaImpl.class }
123 )
124 public class ServiceRegistryJpaImpl implements ServiceRegistry, ManagedService {
125 
126   /** JPA persistence unit name */
127   public static final String PERSISTENCE_UNIT = "org.opencastproject.common";
128 
129   /** Id of the workflow's start operation operation, need to match the corresponding enum value in
130    * WorkflowServiceImpl */
131   public static final String START_OPERATION = "START_OPERATION";
132 
133   /** Id of the workflow's start workflow operation, need to match the corresponding enum value in
134    * WorkflowServiceImpl */
135   public static final String START_WORKFLOW = "START_WORKFLOW";
136 
137   /** Id of the workflow's resume operation, need to match the corresponding enum value in WorkflowServiceImpl */
138   public static final String RESUME = "RESUME";
139 
140   /** Identifier for the workflow service */
141   public static final String TYPE_WORKFLOW = "org.opencastproject.workflow";
142 
143   static final Logger logger = LoggerFactory.getLogger(ServiceRegistryJpaImpl.class);
144 
145   /** The list of registered JMX beans */
146   protected List<ObjectInstance> jmxBeans = new ArrayList<>();
147 
148   /** Hosts statistics JMX type */
149   private static final String JMX_HOSTS_STATISTICS_TYPE = "HostsStatistics";
150 
151   /** Services statistics JMX type */
152   private static final String JMX_SERVICES_STATISTICS_TYPE = "ServicesStatistics";
153 
154   /** Jobs statistics JMX type */
155   private static final String JMX_JOBS_STATISTICS_TYPE = "JobsStatistics";
156 
157   /** The JMX business object for hosts statistics */
158   private HostsStatistics hostsStatistics;
159 
160   /** The JMX business object for services statistics */
161   private ServicesStatistics servicesStatistics;
162 
163   /** The JMX business object for jobs statistics */
164   private JobsStatistics jobsStatistics;
165 
166   /** Current job used to process job in the service registry */
167   private static final ThreadLocal<Job> currentJob = new ThreadLocal<>();
168 
169   /** Configuration key for the maximum load */
170   protected static final String OPT_MAXLOAD = "org.opencastproject.server.maxload";
171 
172   /** Configuration key for the interval to check whether the hosts in the service registry are still alive,
173    * in seconds */
174   protected static final String OPT_HEARTBEATINTERVAL = "heartbeat.interval";
175 
176   /** Configuration key for the collection of job statistics */
177   protected static final String OPT_JOBSTATISTICS = "jobstats.collect";
178 
179   /** Configuration key for the retrieval of service statistics:
180    * Do not consider jobs older than max_job_age (in days) */
181   protected static final String OPT_SERVICE_STATISTICS_MAX_JOB_AGE =
182       "org.opencastproject.statistics.services.max_job_age";
183 
184   /** Configuration key for the encoding preferred worker nodes */
185   protected static final String OPT_ENCODING_WORKERS = "org.opencastproject.encoding.workers";
186 
187   /** Configuration key for the encoding workers load threshold */
188   protected static final String OPT_ENCODING_THRESHOLD = "org.opencastproject.encoding.workers.threshold";
189 
190   /** The http client to use when connecting to remote servers */
191   protected TrustedHttpClient client = null;
192 
193   /** Default jobs limit during dispatching
194    * (larger value will fetch more entries from the database at the same time and increase RAM usage) */
195   static final int DEFAULT_DISPATCH_JOBS_LIMIT = 100;
196 
197   /** Default setting on job statistics collection */
198   static final boolean DEFAULT_JOB_STATISTICS = false;
199 
200   /** Default setting on service statistics retrieval */
201   static final int DEFAULT_SERVICE_STATISTICS_MAX_JOB_AGE = 14;
202 
203   static final List<String>  DEFAULT_ENCODING_WORKERS = new ArrayList<String>();
204 
205   static final double DEFAULT_ENCODING_THRESHOLD = 0.0;
206 
207   /** The configuration key for setting {@link #maxAttemptsBeforeErrorState} */
208   static final String MAX_ATTEMPTS_CONFIG_KEY = "max.attempts";
209 
210   /** The configuration key for setting {@link #noErrorStateServiceTypes} */
211   static final String NO_ERROR_STATE_SERVICE_TYPES_CONFIG_KEY = "no.error.state.service.types";
212 
213   /** Default value for {@link #maxAttemptsBeforeErrorState} */
214   private static final int DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE = 10;
215 
216   /** Default value for {@link #errorStatesEnabled} */
217   private static final boolean DEFAULT_ERROR_STATES_ENABLED = true;
218 
219   /** Number of failed jobs on a service before to set it in error state. -1 will disable error states completely. */
220   protected int maxAttemptsBeforeErrorState = DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE;
221   private boolean errorStatesEnabled = DEFAULT_ERROR_STATES_ENABLED;
222 
223   /** Services for which error state is disabled */
224   private List<String> noErrorStateServiceTypes = new ArrayList<>();
225 
226   /** Default delay between checking if hosts are still alive in seconds * */
227   static final long DEFAULT_HEART_BEAT = 60;
228 
229   /** Default job load when not passed by service creating the job * */
230   static final float DEFAULT_JOB_LOAD = 0.1f;
231 
232   /** This host's base URL */
233   protected String hostName;
234 
235   /** This host's descriptive node name eg admin, worker01 */
236   protected String nodeName;
237 
238   /** The base URL for job URLs */
239   protected String jobHost;
240 
241   /** Comma-seperate list with URLs of encoding specialised workers*/
242   protected static List<String> encodingWorkers = DEFAULT_ENCODING_WORKERS;
243 
244   /** Threshold value under which defined workers get preferred when dispatching encoding jobs */
245   protected static double encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
246 
247   /** The factory used to generate the entity manager */
248   protected EntityManagerFactory emf = null;
249 
250   protected DBSessionFactory dbSessionFactory;
251 
252   protected DBSession db;
253 
254   /** The thread pool to use for dispatching queued jobs and checking on phantom services. */
255   protected ScheduledExecutorService scheduledExecutor = null;
256 
257   /** The security service */
258   protected SecurityService securityService = null;
259 
260   protected IncidentService incidentService = null;
261 
262   protected Incidents incidents;
263 
264   /** Whether to collect detailed job statistics */
265   protected boolean collectJobstats = DEFAULT_JOB_STATISTICS;
266 
267   /** Maximum age of jobs being considering for service statistics */
268   protected int maxJobAge = DEFAULT_SERVICE_STATISTICS_MAX_JOB_AGE;
269 
270   /** A static list of statuses that influence how load balancing is calculated */
271   protected static final List<Status> JOB_STATUSES_INFLUENCING_LOAD_BALANCING;
272 
273   private static final Status[] activeJobStatus =
274       Arrays.stream(Status.values()).filter(Status::isActive).collect(Collectors.toList()).toArray(new Status[0]);
275 
276   protected static HashMap<Long, Float> jobCache = new HashMap<>();
277 
278   static {
279     JOB_STATUSES_INFLUENCING_LOAD_BALANCING = new ArrayList<>();
280     JOB_STATUSES_INFLUENCING_LOAD_BALANCING.add(Status.RUNNING);
281   }
282 
283   /** Whether to accept a job whose load exceeds the host’s max load */
284   protected Boolean acceptJobLoadsExeedingMaxLoad = true;
285 
286   // Current system load
287   protected float localSystemLoad = 0.0f;
288 
289   /** OSGi DI */
290   @Reference(target = "(osgi.unit.name=org.opencastproject.common)")
291   void setEntityManagerFactory(EntityManagerFactory emf) {
292     this.emf = emf;
293   }
294 
295   @Reference
296   public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
297     this.dbSessionFactory = dbSessionFactory;
298   }
299 
300   @Activate
301   public void activate(ComponentContext cc) {
302     logger.info("Activate service registry");
303 
304     db = dbSessionFactory.createSession(emf);
305 
306     // Find this host's url
307     if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty(OpencastConstants.SERVER_URL_PROPERTY))) {
308       hostName = UrlSupport.DEFAULT_BASE_URL;
309     } else {
310       hostName = cc.getBundleContext().getProperty(OpencastConstants.SERVER_URL_PROPERTY);
311     }
312 
313     // Check hostname for sanity. It should be the hosts URL with protocol but without any part of the service paths.
314     if (hostName.endsWith("/")) {
315       logger.warn("The configured value of {} ends with '/'. This is very likely a configuration error which could "
316               + "lead to services not working properly. Note that this configuration should not contain any part of "
317               + "the service paths.", OpencastConstants.SERVER_URL_PROPERTY);
318     }
319 
320     // Clean all undispatchable jobs that were orphaned when this host was last deactivated
321     cleanUndispatchableJobs(hostName);
322 
323     // Register JMX beans with statistics
324     try {
325       List<ServiceStatistics> serviceStatistics = getServiceStatistics();
326       hostsStatistics = new HostsStatistics(serviceStatistics);
327       servicesStatistics = new ServicesStatistics(hostName, serviceStatistics);
328       jobsStatistics = new JobsStatistics(hostName);
329       jmxBeans.add(JmxUtil.registerMXBean(hostsStatistics, JMX_HOSTS_STATISTICS_TYPE));
330       jmxBeans.add(JmxUtil.registerMXBean(servicesStatistics, JMX_SERVICES_STATISTICS_TYPE));
331       jmxBeans.add(JmxUtil.registerMXBean(jobsStatistics, JMX_JOBS_STATISTICS_TYPE));
332     } catch (ServiceRegistryException e) {
333       logger.error("Error registering JMX statistic beans", e);
334     }
335 
336     // Find the jobs URL
337     if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty("org.opencastproject.jobs.url"))) {
338       jobHost = hostName;
339     } else {
340       jobHost = cc.getBundleContext().getProperty("org.opencastproject.jobs.url");
341     }
342 
343     // Register this host
344     try {
345       if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty(OpencastConstants.NODE_NAME_PROPERTY))) {
346         nodeName = null;
347       } else {
348         nodeName = cc.getBundleContext().getProperty(OpencastConstants.NODE_NAME_PROPERTY);
349       }
350 
351       float maxLoad = Runtime.getRuntime().availableProcessors();
352       if (cc != null && StringUtils.isNotBlank(cc.getBundleContext().getProperty(OPT_MAXLOAD))) {
353         try {
354           maxLoad = Float.parseFloat(cc.getBundleContext().getProperty(OPT_MAXLOAD));
355           logger.info("Max load has been set manually to {}", maxLoad);
356         } catch (NumberFormatException e) {
357           logger.warn("Configuration key '{}' is not an integer. Falling back to the number of cores ({})",
358                   OPT_MAXLOAD, maxLoad);
359         }
360       }
361 
362       logger.info("Node maximum load set to {}", maxLoad);
363 
364       String address = InetAddress.getByName(URI.create(hostName).getHost()).getHostAddress();
365       long maxMemory = Runtime.getRuntime().maxMemory();
366       int cores = Runtime.getRuntime().availableProcessors();
367 
368       registerHost(hostName, address, nodeName, maxMemory, cores, maxLoad);
369     } catch (Exception e) {
370       throw new IllegalStateException("Unable to register host " + hostName + " in the service registry", e);
371     }
372 
373     // Whether a service accepts a job whose load exceeds the host’s max load
374     if (cc != null) {
375       acceptJobLoadsExeedingMaxLoad = getOptContextProperty(cc, ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY)
376               .map(Boolean::valueOf)
377               .orElse(DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING);
378     }
379 
380     localSystemLoad = 0;
381     logger.info("Activated");
382   }
383 
384   @Override
385   public float getOwnLoad() {
386     return localSystemLoad;
387   }
388 
389   @Override
390   public String getRegistryHostname() {
391     return hostName;
392   }
393 
394   @Deactivate
395   public void deactivate() {
396     logger.info("deactivate service registry");
397 
398     // Wait for job dispatcher to stop before unregistering hosts and requeuing jobs
399     if (scheduledExecutor != null) {
400       try {
401         scheduledExecutor.shutdownNow();
402         if (!scheduledExecutor.isShutdown()) {
403           logger.info("Waiting for Dispatcher to terminate");
404           scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS);
405         }
406       } catch (InterruptedException e) {
407         logger.error("Error shutting down the Dispatcher", e);
408       }
409     }
410 
411     for (ObjectInstance mbean : jmxBeans) {
412       JmxUtil.unregisterMXBean(mbean);
413     }
414 
415     try {
416       unregisterHost(hostName);
417     } catch (ServiceRegistryException e) {
418       throw new IllegalStateException("Unable to unregister host " + hostName + " from the service registry", e);
419     }
420   }
421 
422   /**
423    * {@inheritDoc}
424    *
425    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String)
426    */
427   @Override
428   public Job createJob(String type, String operation) throws ServiceRegistryException {
429     return createJob(this.hostName, type, operation, null, null, true, getCurrentJob(), DEFAULT_JOB_LOAD);
430   }
431 
432   /**
433    * {@inheritDoc}
434    *
435    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
436    *      java.util.List)
437    */
438   @Override
439   public Job createJob(String type, String operation, List<String> arguments) throws ServiceRegistryException {
440     return createJob(this.hostName, type, operation, arguments, null, true, getCurrentJob(), DEFAULT_JOB_LOAD);
441   }
442 
443   /**
444    * {@inheritDoc}
445    *
446    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
447    *      java.util.List, Float)
448    */
449   @Override
450   public Job createJob(String type, String operation, List<String> arguments, Float jobLoad)
451           throws ServiceRegistryException {
452     return createJob(this.hostName, type, operation, arguments, null, true, getCurrentJob(), jobLoad);
453   }
454 
455   /**
456    * {@inheritDoc}
457    *
458    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
459    *      java.util.List, String, boolean)
460    */
461   @Override
462   public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable)
463           throws ServiceRegistryException {
464     return createJob(this.hostName, type, operation, arguments, payload, dispatchable, getCurrentJob(),
465             DEFAULT_JOB_LOAD);
466   }
467 
468   /**
469    * {@inheritDoc}
470    *
471    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
472    *      java.util.List, java.lang.String, boolean, Float)
473    */
474   @Override
475   public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
476           Float jobLoad) throws ServiceRegistryException {
477     return createJob(this.hostName, type, operation, arguments, payload, dispatchable, getCurrentJob(), jobLoad);
478   }
479 
480   public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
481           Job parentJob) throws ServiceRegistryException {
482     return createJob(this.hostName, type, operation, arguments, payload, dispatchable, parentJob, DEFAULT_JOB_LOAD);
483   }
484 
485   /**
486    * {@inheritDoc}
487    *
488    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
489    *      java.util.List, java.lang.String, boolean, org.opencastproject.job.api.Job, Float)
490    */
491   @Override
492   public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
493           Job parentJob, Float jobLoad) throws ServiceRegistryException {
494     return createJob(this.hostName, type, operation, arguments, payload, dispatchable, parentJob, jobLoad);
495   }
496 
497   /**
498    * Creates a job on a remote host with a jobLoad of 1.0.
499    */
500   public Job createJob(String host, String serviceType, String operation, List<String> arguments, String payload,
501           boolean dispatchable, Job parentJob) throws ServiceRegistryException {
502     return createJob(host, serviceType, operation, arguments, payload, dispatchable, parentJob, DEFAULT_JOB_LOAD);
503   }
504 
505   /**
506    * Creates a job on a remote host.
507    */
508   public Job createJob(String host, String serviceType, String operation, List<String> arguments, String payload,
509           boolean dispatchable, Job parentJob, float jobLoad) throws ServiceRegistryException {
510     if (StringUtils.isBlank(host)) {
511       throw new IllegalArgumentException("Host can't be null");
512     }
513     if (StringUtils.isBlank(serviceType)) {
514       throw new IllegalArgumentException("Service type can't be null");
515     }
516     if (StringUtils.isBlank(operation)) {
517       throw new IllegalArgumentException("Operation can't be null");
518     }
519 
520     JpaJob jpaJob = db.execTxChecked(em -> {
521       ServiceRegistrationJpaImpl creatingService = getServiceRegistrationQuery(serviceType, host).apply(em)
522           .orElseThrow(() -> new ServiceRegistryException("No service registration exists for type '" + serviceType
523               + "' on host '" + host + "'"));
524 
525       if (creatingService.getHostRegistration().isMaintenanceMode()) {
526         logger.warn("Creating a job from {}, which is currently in maintenance mode.", creatingService.getHost());
527       } else if (!creatingService.getHostRegistration().isActive()) {
528         logger.warn("Creating a job from {}, which is currently inactive.", creatingService.getHost());
529       }
530 
531       User currentUser = securityService.getUser();
532       Organization currentOrganization = securityService.getOrganization();
533 
534       JpaJob job = new JpaJob(currentUser, currentOrganization, creatingService, operation, arguments, payload,
535               dispatchable, jobLoad);
536 
537       // Bind the given parent job to the new job
538       if (parentJob != null) {
539         // Get the JPA instance of the parent job
540         JpaJob jpaParentJob = getJpaJobQuery(parentJob.getId()).apply(em).orElseThrow(() -> {
541           logger.error("job with id {} not found in the persistence context", parentJob);
542           // We don't want to leave the deleted job in the cache if there
543           removeFromLoadCache(parentJob.getId());
544           return new ServiceRegistryException(new NotFoundException());
545         });
546         job.setParentJob(jpaParentJob);
547 
548         // Get the JPA instance of the root job
549         JpaJob jpaRootJob = jpaParentJob;
550         if (parentJob.getRootJobId() != null) {
551           jpaRootJob = getJpaJobQuery(parentJob.getRootJobId()).apply(em).orElseThrow(() -> {
552             logger.error("job with id {} not found in the persistence context", parentJob.getRootJobId());
553             // We don't want to leave the deleted job in the cache if there
554             removeFromLoadCache(parentJob.getRootJobId());
555             return new ServiceRegistryException(new NotFoundException());
556           });
557         }
558         job.setRootJob(jpaRootJob);
559       }
560 
561       // if this job is not dispatchable, it must be handled by the host that has created it
562       if (dispatchable) {
563         logger.trace("Queuing dispatchable '{}'", job);
564         job.setStatus(Status.QUEUED);
565       } else {
566         logger.trace("Giving new non-dispatchable '{}' its creating service as processor '{}'", job, creatingService);
567         job.setProcessorServiceRegistration(creatingService);
568       }
569 
570       em.persist(job);
571       return job;
572     });
573 
574     setJobUri(jpaJob);
575     return jpaJob.toJob();
576   }
577 
578   @Override
579   public void removeJobs(List<Long> jobIds) throws NotFoundException, ServiceRegistryException {
580     for (long jobId: jobIds) {
581       if (jobId < 1) {
582         throw new NotFoundException("Job ID must be greater than zero (0)");
583       }
584     }
585 
586     logger.debug("Start deleting jobs with IDs '{}'", jobIds);
587     try {
588       db.execTxChecked(em -> {
589         for (long jobId : jobIds) {
590           JpaJob job = em.find(JpaJob.class, jobId);
591           if (job == null) {
592             logger.error("Job with Id {} cannot be deleted: Not found.", jobId);
593             removeFromLoadCache(jobId);
594             throw new NotFoundException("Job with ID '" + jobId + "' not found");
595           }
596           deleteChildJobsQuery(jobId).accept(em);
597           em.remove(job);
598           removeFromLoadCache(jobId);
599         }
600       });
601     } catch (NotFoundException | ServiceRegistryException e) {
602       throw e;
603     } catch (Exception e) {
604       throw new ServiceRegistryException(e);
605     }
606 
607     logger.info("Jobs with IDs '{}' deleted", jobIds);
608   }
609 
610   private ThrowingConsumer<EntityManager, Exception> deleteChildJobsQuery(long jobId) {
611     return em -> {
612       List<Job> childJobs = getChildJobs(jobId);
613       if (childJobs.isEmpty()) {
614         logger.trace("No child jobs of job '{}' found to delete.", jobId);
615         return;
616       }
617 
618       logger.debug("Start deleting child jobs of job '{}'", jobId);
619 
620       try {
621         for (int i = childJobs.size() - 1; i >= 0; i--) {
622           Job job = childJobs.get(i);
623           JpaJob jobToDelete = em.find(JpaJob.class, job.getId());
624           em.remove(jobToDelete);
625           removeFromLoadCache(job.getId());
626           logger.debug("{} deleted", job);
627         }
628         logger.debug("Deleted all child jobs of job '{}'", jobId);
629       } catch (Exception e) {
630         throw new ServiceRegistryException("Unable to remove child jobs from " + jobId, e);
631       }
632     };
633   }
634 
635   @Override
636   public void removeParentlessJobs(int lifetime) throws ServiceRegistryException {
637     int count = db.execTxChecked(em -> {
638       int c = 0;
639 
640       List<Job> jobs = namedQuery.findAll("Job.withoutParent", JpaJob.class).apply(em).stream()
641           .map(JpaJob::toJob)
642           .filter(j -> j.getDateCreated().before(DateUtils.addDays(new Date(), -lifetime)))
643           // DO NOT DELETE workflow instances and operations!
644           .filter(j -> !START_OPERATION.equals(j.getOperation())
645               && !START_WORKFLOW.equals(j.getOperation())
646               && !RESUME.equals(j.getOperation()))
647           .filter(j -> j.getStatus().isTerminated())
648           .collect(Collectors.toList());
649 
650       for (Job job : jobs) {
651         try {
652           removeJobs(Collections.singletonList(job.getId()));
653           logger.debug("Parentless '{}' removed", job);
654           c++;
655         } catch (NotFoundException e) {
656           logger.debug("Parentless '{} ' not found in database", job, e);
657         }
658       }
659 
660       return c;
661     });
662 
663 
664     if (count > 0) {
665       logger.info("Successfully removed {} parentless jobs", count);
666     } else {
667       logger.trace("No parentless jobs found to remove");
668     }
669   }
670 
671   /**
672    * {@inheritDoc}
673    *
674    * @see org.osgi.service.cm.ManagedService#updated(java.util.Dictionary)
675    */
676   @Override
677   public void updated(Dictionary properties) throws ConfigurationException {
678     logger.info("Updating service registry properties");
679 
680     maxAttemptsBeforeErrorState = DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE;
681     errorStatesEnabled = DEFAULT_ERROR_STATES_ENABLED;
682     String maxAttempts = StringUtils.trimToNull((String) properties.get(MAX_ATTEMPTS_CONFIG_KEY));
683     if (maxAttempts != null) {
684       try {
685         maxAttemptsBeforeErrorState = Integer.parseInt(maxAttempts);
686         if (maxAttemptsBeforeErrorState < 0) {
687           errorStatesEnabled = false;
688           logger.info("Error states of services disabled");
689         } else {
690           errorStatesEnabled = true;
691           logger.info("Set max attempts before error state to {}", maxAttempts);
692         }
693       } catch (NumberFormatException e) {
694         logger.warn("Can not set max attempts before error state to {}. {} must be an integer", maxAttempts,
695                 MAX_ATTEMPTS_CONFIG_KEY);
696       }
697     }
698 
699     noErrorStateServiceTypes = new ArrayList<>();
700     String noErrorStateServiceTypesStr = StringUtils.trimToNull((String) properties.get(
701             NO_ERROR_STATE_SERVICE_TYPES_CONFIG_KEY));
702     if (noErrorStateServiceTypesStr != null) {
703       noErrorStateServiceTypes = Arrays.asList(noErrorStateServiceTypesStr.split("\\s*,\\s*"));
704       if (!noErrorStateServiceTypes.isEmpty()) {
705         logger.info("Set service types without error state to {}", String.join(", ", noErrorStateServiceTypes));
706       }
707     }
708 
709     long heartbeatInterval = DEFAULT_HEART_BEAT;
710     String heartbeatIntervalString = StringUtils.trimToNull((String) properties.get(OPT_HEARTBEATINTERVAL));
711     if (StringUtils.isNotBlank(heartbeatIntervalString)) {
712       try {
713         heartbeatInterval = Long.parseLong(heartbeatIntervalString);
714       } catch (Exception e) {
715         logger.warn("Heartbeat interval '{}' is malformed, setting to {}", heartbeatIntervalString, DEFAULT_HEART_BEAT);
716         heartbeatInterval = DEFAULT_HEART_BEAT;
717       }
718       if (heartbeatInterval == 0) {
719         logger.info("Heartbeat disabled");
720       } else if (heartbeatInterval < 0) {
721         logger.warn("Heartbeat interval {} seconds too low, adjusting to {}", heartbeatInterval, DEFAULT_HEART_BEAT);
722         heartbeatInterval = DEFAULT_HEART_BEAT;
723       } else {
724         logger.info("Heartbeat interval set to {} seconds", heartbeatInterval);
725       }
726     }
727 
728     String jobStatsString = StringUtils.trimToNull((String) properties.get(OPT_JOBSTATISTICS));
729     if (StringUtils.isNotBlank(jobStatsString)) {
730       try {
731         collectJobstats = Boolean.parseBoolean(jobStatsString);
732       } catch (Exception e) {
733         logger.warn("Job statistics collection flag '{}' is malformed, setting to {}", jobStatsString,
734                 DEFAULT_JOB_STATISTICS);
735         collectJobstats = DEFAULT_JOB_STATISTICS;
736       }
737     }
738 
739     // get the encoding worker nodes defined in the configuration file and parse the comma-separated list
740     String encodingWorkersString = (String) properties.get(OPT_ENCODING_WORKERS);
741     if (StringUtils.isNotBlank(encodingWorkersString)) {
742       encodingWorkers = Arrays.asList(encodingWorkersString.split("\\s*,\\s*"));
743     } else {
744       encodingWorkers = DEFAULT_ENCODING_WORKERS;
745     }
746 
747     // get the encoding worker load threshold defined in the configuration file and parse the double
748     String encodingThersholdString = StringUtils.trimToNull((String) properties.get(OPT_ENCODING_THRESHOLD));
749     if (StringUtils.isNotBlank(encodingThersholdString) && encodingThersholdString != null) {
750         try {
751           double encodingThresholdTmp = Double.parseDouble(encodingThersholdString);
752           if (encodingThresholdTmp >= 0 && encodingThresholdTmp <= 1) {
753             encodingThreshold = encodingThresholdTmp;
754           } else {
755             encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
756             logger.warn("org.opencastproject.encoding.workers.threshold is not between 0 and 1");
757           }
758         } catch (NumberFormatException e) {
759           logger.warn("Can not set encoding threshold to {}. {} must be an parsable double", encodingThersholdString,
760               OPT_ENCODING_THRESHOLD);
761         }
762     } else {
763       encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
764     }
765 
766     String maxJobAgeString = StringUtils.trimToNull((String) properties.get(OPT_SERVICE_STATISTICS_MAX_JOB_AGE));
767     if (maxJobAgeString != null) {
768       try {
769         maxJobAge = Integer.parseInt(maxJobAgeString);
770         logger.info("Set service statistics max job age to {}", maxJobAgeString);
771       } catch (NumberFormatException e) {
772         logger.warn("Can not set service statistics max job age to {}. {} must be an integer", maxJobAgeString,
773                 OPT_SERVICE_STATISTICS_MAX_JOB_AGE);
774       }
775     }
776 
777     scheduledExecutor = Executors.newScheduledThreadPool(1);
778 
779     // Schedule the service heartbeat if the interval is > 0
780     if (heartbeatInterval > 0) {
781       logger.debug("Starting service heartbeat at a custom interval of {}s", heartbeatInterval);
782       scheduledExecutor.scheduleWithFixedDelay(new JobProducerHeartbeat(), heartbeatInterval, heartbeatInterval,
783               TimeUnit.SECONDS);
784     }
785   }
786 
787   /**
788    * OSGI callback when the configuration is updated. This method is only here to prevent the
789    * configuration admin service from calling the service deactivate and activate methods
790    * for a config update. It does not have to do anything as the updates are handled by updated().
791    */
792   @Modified
793   public void modified(Map<String, Object> config) throws ConfigurationException {
794     logger.debug("Modified serviceregistry");
795   }
796 
797   private Function<EntityManager, Optional<JpaJob>> getJpaJobQuery(long id) {
798     return em -> namedQuery.findByIdOpt(JpaJob.class, id)
799         .apply(em)
800         .map(jpaJob -> {
801           // JPA's caches can be out of date if external changes (e.g. another node in the cluster) have been made to
802           // this row in the database
803           em.refresh(jpaJob);
804           setJobUri(jpaJob);
805           return jpaJob;
806         });
807   }
808 
809   @Override
810   public Job getJob(long id) throws NotFoundException, ServiceRegistryException {
811     try {
812       return db.exec(getJpaJobQuery(id))
813           .map(JpaJob::toJob)
814           .orElseThrow(NotFoundException::new);
815     } catch (NotFoundException e) {
816       throw e;
817     } catch (Exception e) {
818       throw new ServiceRegistryException(e);
819     }
820   }
821 
822   /**
823    * {@inheritDoc}
824    *
825    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getCurrentJob()
826    */
827   @Override
828   public Job getCurrentJob() {
829     return currentJob.get();
830   }
831 
832   /**
833    * {@inheritDoc}
834    *
835    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#setCurrentJob(Job)
836    */
837   @Override
838   public void setCurrentJob(Job job) {
839     currentJob.set(job);
840   }
841 
842   JpaJob updateJob(JpaJob job) throws ServiceRegistryException {
843     try {
844       // tx context is opened in
845       //   updateInternal
846       //   updateServiceForFailover
847       return db.execChecked(em -> {
848         Job oldJob = getJob(job.getId());
849         JpaJob jpaJob = updateInternal(job);
850         if (!TYPE_WORKFLOW.equals(job.getJobType()) && job.getJobLoad() > 0.0f
851             && job.getProcessorServiceRegistration() != null
852             && job.getProcessorServiceRegistration().getHost().equals(getRegistryHostname())) {
853           processCachedLoadChange(job);
854         }
855 
856         // All WorkflowService Jobs will be ignored
857         if (oldJob.getStatus() != job.getStatus() && !TYPE_WORKFLOW.equals(job.getJobType())) {
858           updateServiceForFailover(job);
859         }
860 
861         return jpaJob;
862       });
863     } catch (ServiceRegistryException e) {
864       throw e;
865     } catch (NotFoundException e) {
866       // Just in case, remove from cache if there
867       removeFromLoadCache(job.getId());
868       throw new ServiceRegistryException(e);
869     } catch (Exception e) {
870       throw new ServiceRegistryException(e);
871     }
872   }
873 
874   @Override
875   public Job updateJob(Job job) throws ServiceRegistryException {
876     JpaJob jpaJob = JpaJob.from(job);
877     jpaJob.setProcessorServiceRegistration(
878             (ServiceRegistrationJpaImpl) getServiceRegistration(job.getJobType(), job.getProcessingHost()));
879     return updateJob(jpaJob).toJob();
880   }
881 
882   /**
883    * Processes the job load changes for the *local* load cache
884    *
885    * @param job
886    *   The job to apply to the load cache
887    */
888   private synchronized void processCachedLoadChange(JpaJob job) {
889     if (JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(job.getStatus()) && jobCache.get(job.getId()) == null) {
890       logger.debug("Adding to load cache: {}, type {}, load {}, status {}",
891               job, job.getJobType(), job.getJobLoad(), job.getStatus());
892       localSystemLoad += job.getJobLoad();
893       jobCache.put(job.getId(), job.getJobLoad());
894     } else if (jobCache.get(job.getId()) != null && Status.FINISHED.equals(job.getStatus())
895             || Status.FAILED.equals(job.getStatus()) || Status.WAITING.equals(job.getStatus())) {
896       logger.debug("Removing from load cache: {}, type {}, load {}, status {}",
897               job, job.getJobType(), job.getJobLoad(), job.getStatus());
898       localSystemLoad -= job.getJobLoad();
899       jobCache.remove(job.getId());
900     } else {
901       logger.debug("Ignoring for load cache: {}, type {}, status {}",
902               job, job.getJobType(), job.getStatus());
903     }
904     logger.debug("Current host load: {}, job load cache size: {}", format("%.1f", localSystemLoad), jobCache.size());
905 
906     if (jobCache.isEmpty()) {
907       if (Math.abs(localSystemLoad) > 0.0000001F) {
908         logger.warn("No jobs in the job load cache, but load is {}: setting job load to 0", localSystemLoad);
909       }
910       localSystemLoad = 0.0F;
911     }
912   }
913 
914   private synchronized void removeFromLoadCache(Long jobId) {
915     if (jobCache.get(jobId) != null) {
916       float jobLoad = jobCache.get(jobId);
917       logger.debug("Removing deleted job from load cache: Job {}, load {}", jobId, jobLoad);
918       localSystemLoad -= jobLoad;
919       jobCache.remove(jobId);
920     }
921   }
922 
923   protected JpaJob setJobUri(JpaJob job) {
924     try {
925       job.setUri(new URI(jobHost + "/services/job/" + job.getId() + ".xml"));
926     } catch (URISyntaxException e) {
927       logger.warn("Can not set the job URI", e);
928     }
929     return job;
930   }
931 
932   /**
933    * Internal method to update a job, throwing unwrapped JPA exceptions.
934    *
935    * @param job
936    *          the job to update
937    * @return the updated job
938    */
939   protected JpaJob updateInternal(JpaJob job) throws NotFoundException {
940     JpaJob fromDb = db.execTxChecked(em -> {
941       JpaJob j = em.find(JpaJob.class, job.getId());
942       if (j == null) {
943         throw new NotFoundException();
944       }
945 
946       update(j, job);
947       em.merge(j);
948       return j;
949     });
950 
951     job.setVersion(fromDb.toJob().getVersion());
952     setJobUri(job);
953     return job;
954   }
955 
956   public void updateStatisticsJobData() {
957     jobsStatistics.updateAvg(db.exec(getAvgOperationsQuery()));
958     jobsStatistics.updateJobCount(db.exec(getCountPerHostServiceQuery()));
959   }
960 
961   /**
962    * Internal method to update the service registration state, throwing unwrapped JPA exceptions.
963    *
964    * @param registration
965    *          the service registration to update
966    * @return the updated service registration
967    */
968   private ServiceRegistration updateServiceState(ServiceRegistrationJpaImpl registration) throws NotFoundException {
969     db.execTxChecked(em -> {
970       ServiceRegistrationJpaImpl fromDb = em.find(ServiceRegistrationJpaImpl.class, registration.getId());
971       if (fromDb == null) {
972         throw new NotFoundException();
973       }
974       fromDb.setServiceState(registration.getServiceState());
975       fromDb.setStateChanged(registration.getStateChanged());
976       fromDb.setWarningStateTrigger(registration.getWarningStateTrigger());
977       fromDb.setErrorStateTrigger(registration.getErrorStateTrigger());
978     });
979 
980     servicesStatistics.updateService(registration);
981     return registration;
982   }
983 
984   /**
985    * Sets the queue and runtimes and other elements of a persistent job based on a job that's been modified in memory.
986    * Times on both the objects must be modified, since the in-memory job must not be stale.
987    *
988    * @param fromDb
989    *          The job from the database
990    * @param jpaJob
991    *          The in-memory job
992    */
993   private void update(JpaJob fromDb, JpaJob jpaJob) {
994     final Job job = jpaJob.toJob();
995     final Date now = new Date();
996     final Status status = job.getStatus();
997     final Status fromDbStatus = fromDb.getStatus();
998 
999     fromDb.setPayload(job.getPayload());
1000     fromDb.setStatus(job.getStatus());
1001     fromDb.setDispatchable(job.isDispatchable());
1002     fromDb.setVersion(job.getVersion());
1003     fromDb.setOperation(job.getOperation());
1004     fromDb.setArguments(job.getArguments());
1005 
1006     if (job.getDateCreated() == null) {
1007       jpaJob.setDateCreated(now);
1008       fromDb.setDateCreated(now);
1009       job.setDateCreated(now);
1010     }
1011     if (job.getProcessingHost() != null) {
1012       ServiceRegistrationJpaImpl processingService = (ServiceRegistrationJpaImpl) getServiceRegistration(
1013               job.getJobType(), job.getProcessingHost());
1014       logger.debug("{} has host '{}': setting processor service to '{}'", job, job.getProcessingHost(),
1015           processingService);
1016       fromDb.setProcessorServiceRegistration(processingService);
1017     } else {
1018       logger.debug("Unsetting previous processor service registration for {}", job);
1019       fromDb.setProcessorServiceRegistration(null);
1020     }
1021     if (Status.RUNNING.equals(status) && !Status.WAITING.equals(fromDbStatus)) {
1022       if (job.getDateStarted() == null) {
1023         jpaJob.setDateStarted(now);
1024         jpaJob.setQueueTime(now.getTime() - job.getDateCreated().getTime());
1025         fromDb.setDateStarted(now);
1026         fromDb.setQueueTime(now.getTime() - job.getDateCreated().getTime());
1027         job.setDateStarted(now);
1028         job.setQueueTime(now.getTime() - job.getDateCreated().getTime());
1029       }
1030     } else if (Status.FAILED.equals(status)) {
1031       // failed jobs may not have even started properly
1032       if (job.getDateCompleted() == null) {
1033         fromDb.setDateCompleted(now);
1034         jpaJob.setDateCompleted(now);
1035         job.setDateCompleted(now);
1036         if (job.getDateStarted() != null) {
1037           jpaJob.setRunTime(now.getTime() - job.getDateStarted().getTime());
1038           fromDb.setRunTime(now.getTime() - job.getDateStarted().getTime());
1039           job.setRunTime(now.getTime() - job.getDateStarted().getTime());
1040         }
1041       }
1042     } else if (Status.FINISHED.equals(status)) {
1043       if (job.getDateStarted() == null) {
1044         // Some services (e.g. ingest) don't use job dispatching, since they start immediately and handle their own
1045         // lifecycle. In these cases, if the start date isn't set, use the date created as the start date
1046         jpaJob.setDateStarted(job.getDateCreated());
1047         job.setDateStarted(job.getDateCreated());
1048       }
1049       if (job.getDateCompleted() == null) {
1050         jpaJob.setDateCompleted(now);
1051         jpaJob.setRunTime(now.getTime() - job.getDateStarted().getTime());
1052         fromDb.setDateCompleted(now);
1053         fromDb.setRunTime(now.getTime() - job.getDateStarted().getTime());
1054         job.setDateCompleted(now);
1055         job.setRunTime(now.getTime() - job.getDateStarted().getTime());
1056       }
1057     }
1058   }
1059 
1060   /**
1061    * Fetches a host registration from persistence.
1062    *
1063    * @param host
1064    *          the host name
1065    * @return the host registration, or null if none exists
1066    */
1067   protected Function<EntityManager, Optional<HostRegistrationJpaImpl>> fetchHostRegistrationQuery(String host) {
1068     return namedQuery.findOpt(
1069         "HostRegistration.byHostName",
1070         HostRegistrationJpaImpl.class,
1071         Pair.of("host", host)
1072     );
1073   }
1074 
1075   /**
1076    * {@inheritDoc}
1077    *
1078    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#registerHost(String, String, String, long, int, float)
1079    */
1080   @Override
1081   public void registerHost(String host, String address, String nodeName, long memory, int cores, float maxLoad)
1082           throws ServiceRegistryException {
1083     try {
1084       HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1085         // Find the existing registrations for this host and if it exists, update it
1086         Optional<HostRegistrationJpaImpl> hostRegistrationOpt = fetchHostRegistrationQuery(host).apply(em);
1087         HostRegistrationJpaImpl hr;
1088 
1089         if (hostRegistrationOpt.isEmpty()) {
1090           hr = new HostRegistrationJpaImpl(host, address, nodeName, memory, cores, maxLoad, true, false);
1091           em.persist(hr);
1092         } else {
1093           hr = hostRegistrationOpt.get();
1094           hr.setIpAddress(address);
1095           hr.setNodeName(nodeName);
1096           hr.setMemory(memory);
1097           hr.setCores(cores);
1098           hr.setMaxLoad(maxLoad);
1099           hr.setOnline(true);
1100           em.merge(hr);
1101         }
1102         logger.info("Registering {} with a maximum load of {}", host, maxLoad);
1103         return hr;
1104       });
1105 
1106       hostsStatistics.updateHost(hostRegistration);
1107     } catch (Exception e) {
1108       throw new ServiceRegistryException(e);
1109     }
1110   }
1111 
1112   /**
1113    * {@inheritDoc}
1114    *
1115    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#unregisterHost(java.lang.String)
1116    */
1117   @Override
1118   public void unregisterHost(String host) throws ServiceRegistryException {
1119     try {
1120       HostRegistrationJpaImpl existingHostRegistration = db.execTxChecked(em -> {
1121         HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1122             () -> new IllegalArgumentException("Host '" + host + "' is not registered, so it can not be unregistered"));
1123 
1124         hr.setOnline(false);
1125         for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1126           unRegisterService(serviceRegistration.getServiceType(), serviceRegistration.getHost());
1127         }
1128         em.merge(hr);
1129 
1130         logger.info("Unregistering {}", host);
1131         return hr;
1132       });
1133 
1134       logger.info("Host {} unregistered", host);
1135       hostsStatistics.updateHost(existingHostRegistration);
1136     } catch (Exception e) {
1137       throw new ServiceRegistryException(e);
1138     }
1139   }
1140 
1141   /**
1142    * {@inheritDoc}
1143    *
1144    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#enableHost(String)
1145    */
1146   @Override
1147   public void enableHost(String host) throws ServiceRegistryException, NotFoundException {
1148     try {
1149       HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1150         // Find the existing registrations for this host and if it exists, update it
1151         HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1152             () -> new NotFoundException("Host '" + host + "' is currently not registered, so it can not be enabled"));
1153         hr.setActive(true);
1154         em.merge(hr);
1155         logger.info("Enabling {}", host);
1156         return hr;
1157       });
1158 
1159       db.execTxChecked(em -> {
1160         for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1161           ((ServiceRegistrationJpaImpl) serviceRegistration).setActive(true);
1162           em.merge(serviceRegistration);
1163           servicesStatistics.updateService(serviceRegistration);
1164         }
1165       });
1166 
1167       hostsStatistics.updateHost(hostRegistration);
1168     } catch (NotFoundException e) {
1169       throw e;
1170     } catch (Exception e) {
1171       throw new ServiceRegistryException(e);
1172     }
1173   }
1174 
1175   /**
1176    * {@inheritDoc}
1177    *
1178    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#disableHost(String)
1179    */
1180   @Override
1181   public void disableHost(String host) throws ServiceRegistryException, NotFoundException {
1182     try {
1183       HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1184         HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1185             () -> new NotFoundException("Host '" + host + "' is not currently registered, so it can not be disabled"));
1186 
1187         hr.setActive(false);
1188         for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1189           ((ServiceRegistrationJpaImpl) serviceRegistration).setActive(false);
1190           em.merge(serviceRegistration);
1191           servicesStatistics.updateService(serviceRegistration);
1192         }
1193         em.merge(hr);
1194 
1195         logger.info("Disabling {}", host);
1196         return hr;
1197       });
1198 
1199       hostsStatistics.updateHost(hostRegistration);
1200     } catch (NotFoundException e) {
1201       throw e;
1202     } catch (Exception e) {
1203       throw new ServiceRegistryException(e);
1204     }
1205   }
1206 
1207   /**
1208    * {@inheritDoc}
1209    *
1210    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#registerService(java.lang.String, java.lang.String,
1211    *      java.lang.String)
1212    */
1213   @Override
1214   public ServiceRegistration registerService(String serviceType, String baseUrl, String path)
1215           throws ServiceRegistryException {
1216     return registerService(serviceType, baseUrl, path, false);
1217   }
1218 
1219   /**
1220    * {@inheritDoc}
1221    *
1222    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#registerService(java.lang.String, java.lang.String,
1223    *      java.lang.String, boolean)
1224    */
1225   @Override
1226   public ServiceRegistration registerService(String serviceType, String baseUrl, String path, boolean jobProducer)
1227           throws ServiceRegistryException {
1228     cleanRunningJobs(serviceType, baseUrl);
1229     return setOnlineStatus(serviceType, baseUrl, path, true, jobProducer);
1230   }
1231 
1232   protected Function<EntityManager, Optional<ServiceRegistrationJpaImpl>> getServiceRegistrationQuery(
1233       String serviceType, String host) {
1234     return namedQuery.findOpt(
1235         "ServiceRegistration.getRegistration",
1236         ServiceRegistrationJpaImpl.class,
1237         Pair.of("serviceType", serviceType),
1238         Pair.of("host", host)
1239     );
1240   }
1241 
1242   /**
1243    * Sets the online status of a service registration.
1244    *
1245    * @param serviceType
1246    *          The job type
1247    * @param baseUrl
1248    *          the host URL
1249    * @param online
1250    *          whether the service is online or off
1251    * @param jobProducer
1252    *          whether this service produces jobs for long running operations
1253    * @return the service registration
1254    */
1255   protected ServiceRegistration setOnlineStatus(String serviceType, String baseUrl, String path, boolean online,
1256           Boolean jobProducer) throws ServiceRegistryException {
1257     if (isBlank(serviceType) || isBlank(baseUrl)) {
1258       logger.info("Uninformed baseUrl '{}' or service '{}' (path '{}')", baseUrl, serviceType, path);
1259       throw new IllegalArgumentException("serviceType and baseUrl must not be blank");
1260     }
1261 
1262     try {
1263       AtomicReference<HostRegistrationJpaImpl> hostRegistration = new AtomicReference<>();
1264       AtomicReference<ServiceRegistrationJpaImpl> registration = new AtomicReference<>();
1265 
1266       db.execTxChecked(em -> {
1267         HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(baseUrl).apply(em).orElseThrow(() -> {
1268           logger.info("No associated host registration for '{}' or service '{}' (path '{}')", baseUrl, serviceType,
1269               path);
1270           return new IllegalStateException(
1271               "A service registration can not be updated when it has no associated host registration");
1272         });
1273         hostRegistration.set(hr);
1274 
1275         ServiceRegistrationJpaImpl sr;
1276         Optional<ServiceRegistrationJpaImpl> srOpt = getServiceRegistrationQuery(serviceType, baseUrl).apply(em);
1277         if (srOpt.isEmpty()) {
1278           if (isBlank(path)) {
1279             // we can not create a new registration without a path
1280             throw new IllegalArgumentException("path must not be blank when registering new services");
1281           }
1282 
1283           // if we are not provided a value, consider it to be false
1284           sr = new ServiceRegistrationJpaImpl(hr, serviceType, path, Objects.requireNonNullElse(jobProducer, false));
1285           em.persist(sr);
1286         } else {
1287           sr = srOpt.get();
1288           if (StringUtils.isNotBlank(path)) {
1289             sr.setPath(path);
1290           }
1291           sr.setOnline(online);
1292           if (jobProducer != null) { // if we are not provided a value, don't update the persistent value
1293             sr.setJobProducer(jobProducer);
1294           }
1295           em.merge(sr);
1296         }
1297         registration.set(sr);
1298       });
1299 
1300       hostsStatistics.updateHost(hostRegistration.get());
1301       servicesStatistics.updateService(registration.get());
1302       return registration.get();
1303     } catch (Exception e) {
1304       throw new ServiceRegistryException(e);
1305     }
1306   }
1307 
1308   /**
1309    * {@inheritDoc}
1310    *
1311    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#unRegisterService(java.lang.String, java.lang.String)
1312    */
1313   @Override
1314   public void unRegisterService(String serviceType, String baseUrl) throws ServiceRegistryException {
1315     logger.info("Unregistering Service {}@{} and cleaning its running jobs", serviceType, baseUrl);
1316     // TODO: create methods that accept an entity manager, so we can execute multiple queries using the same em and tx
1317     //       (em and tx are reused if using nested db.execTx)
1318     setOnlineStatus(serviceType, baseUrl, null, false, null);
1319     cleanRunningJobs(serviceType, baseUrl);
1320   }
1321 
1322   /**
1323    * Find all undispatchable jobs that were orphaned when this host was last deactivated and set them to CANCELLED.
1324    */
1325   private void cleanUndispatchableJobs(String hostName) {
1326     logger.debug("Starting check for undispatchable jobs for host {}", hostName);
1327 
1328     try {
1329       db.execTxChecked(em -> {
1330         List<JpaJob> undispatchableJobs = namedQuery.findAll(
1331             "Job.undispatchable.status",
1332             JpaJob.class,
1333             Pair.of("statuses", List.of(
1334                 Status.INSTANTIATED.ordinal(),
1335                 Status.RUNNING.ordinal()
1336             ))
1337         ).apply(em);
1338 
1339         if (undispatchableJobs.size() > 0) {
1340           logger.info("Found {} undispatchable jobs on host {}", undispatchableJobs.size(), hostName);
1341         }
1342 
1343         for (JpaJob job : undispatchableJobs) {
1344           // Make sure the job was processed on this host
1345           String jobHost = "";
1346           if (job.getProcessorServiceRegistration() != null) {
1347             jobHost = job.getProcessorServiceRegistration().getHost();
1348           }
1349 
1350           if (!jobHost.equals(hostName)) {
1351             logger.debug("Will not cancel undispatchable job {} for host {}, it is running on a different host ({})",
1352                 job, hostName, jobHost);
1353             continue;
1354           }
1355 
1356           logger.info("Cancelling the running undispatchable job {}, it was orphaned on this host ({})", job, hostName);
1357           job.setStatus(Status.CANCELLED);
1358           em.merge(job);
1359         }
1360       });
1361     } catch (Exception e) {
1362       logger.error("Unable to clean undispatchable jobs for host {}! {}", hostName, e.getMessage());
1363     }
1364   }
1365 
1366   /**
1367    * Find all running jobs on this service and set them to RESET or CANCELLED.
1368    *
1369    * @param serviceType
1370    *          the service type
1371    * @param baseUrl
1372    *          the base url
1373    * @throws ServiceRegistryException
1374    *           if there is a problem communicating with the jobs database
1375    */
1376   private void cleanRunningJobs(String serviceType, String baseUrl) throws ServiceRegistryException {
1377     try {
1378       db.execTxChecked(em -> {
1379         TypedQuery<JpaJob> query = em.createNamedQuery("Job.processinghost.status", JpaJob.class)
1380             .setLockMode(LockModeType.PESSIMISTIC_WRITE)
1381             .setParameter("statuses", List.of(
1382                 Status.RUNNING.ordinal(),
1383                 Status.DISPATCHING.ordinal(),
1384                 Status.WAITING.ordinal()
1385             ))
1386             .setParameter("host", baseUrl)
1387             .setParameter("serviceType", serviceType);
1388 
1389         List<JpaJob> unregisteredJobs = query.getResultList();
1390         if (unregisteredJobs.size() > 0) {
1391           logger.info("Found {} jobs to clean for {}@{}", unregisteredJobs.size(), serviceType, baseUrl);
1392         }
1393 
1394         for (JpaJob job : unregisteredJobs) {
1395           if (job.isDispatchable()) {
1396             em.refresh(job);
1397             // If this job has already been treated
1398             if (Status.CANCELLED.equals(job.getStatus()) || Status.RESTART.equals(job.getStatus())) {
1399               continue;
1400             }
1401 
1402             if (job.getRootJob() != null && Status.PAUSED.equals(job.getRootJob().getStatus())) {
1403               JpaJob rootJob = job.getRootJob();
1404               cancelAllChildrenQuery(rootJob).accept(em);
1405               rootJob.setStatus(Status.RESTART);
1406               rootJob.setOperation(START_OPERATION);
1407               em.merge(rootJob);
1408               continue;
1409             }
1410 
1411             logger.info("Marking child jobs from {} as canceled", job);
1412             cancelAllChildrenQuery(job).accept(em);
1413 
1414             logger.info("Rescheduling lost {}", job);
1415             job.setStatus(Status.RESTART);
1416             job.setProcessorServiceRegistration(null);
1417           } else {
1418             logger.info("Marking lost {} as failed", job);
1419             job.setStatus(Status.FAILED);
1420           }
1421 
1422           em.merge(job);
1423         }
1424       });
1425     } catch (Exception e) {
1426       throw new ServiceRegistryException(e);
1427     }
1428   }
1429 
1430   /**
1431    * Go through all the children recursively to set them in {@link Status#CANCELLED} status
1432    *
1433    * @param job
1434    *          the parent job
1435    */
1436   private Consumer<EntityManager> cancelAllChildrenQuery(JpaJob job) {
1437     return em -> job.getChildJobs().stream()
1438         .peek(em::refresh)
1439         .filter(child -> Status.CANCELLED.equals(child.getStatus()))
1440         .forEach(child -> {
1441           cancelAllChildrenQuery(child).accept(em);
1442           child.setStatus(Status.CANCELLED);
1443           em.merge(child);
1444         });
1445   }
1446 
1447   /**
1448    * {@inheritDoc}
1449    *
1450    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#setMaintenanceStatus(java.lang.String, boolean)
1451    */
1452   @Override
1453   public void setMaintenanceStatus(String baseUrl, boolean maintenance) throws NotFoundException {
1454     logger.info("Setting maintenance mode on host '{}'", baseUrl);
1455     HostRegistrationJpaImpl reg = db.execTxChecked(em -> {
1456       HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(baseUrl).apply(em).orElseThrow(() -> {
1457             logger.warn("Can not set maintenance mode because host '{}' was not registered", baseUrl);
1458         return new NotFoundException("Can not set maintenance mode on a host that has not been registered");
1459       });
1460       hr.setMaintenanceMode(maintenance);
1461       em.merge(hr);
1462       return hr;
1463     });
1464 
1465     hostsStatistics.updateHost(reg);
1466     logger.info("Finished setting maintenance mode on host '{}'", baseUrl);
1467   }
1468 
1469   /**
1470    * {@inheritDoc}
1471    *
1472    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrations()
1473    */
1474   @Override
1475   public List<ServiceRegistration> getServiceRegistrations() {
1476     return db.exec(getServiceRegistrationsQuery());
1477   }
1478 
1479   @Override
1480   public Incidents incident() {
1481     return incidents;
1482   }
1483 
1484   private List<ServiceRegistration> getOnlineServiceRegistrations() {
1485     return db.exec(namedQuery.findAll("ServiceRegistration.getAllOnline", ServiceRegistration.class));
1486   }
1487 
1488   /**
1489    * Gets all service registrations.
1490    *
1491    * @return the list of service registrations
1492    */
1493   protected Function<EntityManager, List<ServiceRegistration>> getServiceRegistrationsQuery() {
1494     return namedQuery.findAll("ServiceRegistration.getAll", ServiceRegistration.class);
1495   }
1496 
1497   /**
1498    * Gets all host registrations
1499    *
1500    * @return the list of host registrations
1501    */
1502   @Override
1503   public List<HostRegistration> getHostRegistrations() {
1504     return db.exec(getHostRegistrationsQuery());
1505   }
1506 
1507   @Override
1508   public HostStatistics getHostStatistics() {
1509     HostStatistics statistics = new HostStatistics();
1510 
1511     db.exec(namedQuery.findAll(
1512         "HostRegistration.jobStatistics",
1513         Object[].class,
1514         Pair.of("status", List.of(Status.QUEUED.ordinal(), Status.RUNNING.ordinal()))
1515     )).forEach(row -> {
1516       final long host = ((Number) row[0]).longValue();
1517       final int status = ((Number) row[1]).intValue();
1518       final long count = ((Number) row[2]).longValue();
1519 
1520       if (status == Status.RUNNING.ordinal()) {
1521         statistics.addRunning(host, count);
1522       } else {
1523         statistics.addQueued(host, count);
1524       }
1525     });
1526 
1527     return statistics;
1528   }
1529 
1530   /**
1531    * Gets all host registrations
1532    *
1533    * @return the list of host registrations
1534    */
1535   protected Function<EntityManager, List<HostRegistration>> getHostRegistrationsQuery() {
1536     return namedQuery.findAll("HostRegistration.getAll", HostRegistration.class);
1537   }
1538 
1539   @Override
1540   public HostRegistration getHostRegistration(String hostname) throws ServiceRegistryException {
1541     return db.exec(getHostRegistrationQuery(hostname));
1542   }
1543 
1544   protected Function<EntityManager, HostRegistration> getHostRegistrationQuery(String hostname) {
1545     return namedQuery.find(
1546         "HostRegistration.byHostName",
1547         HostRegistration.class,
1548         Pair.of("host", hostname)
1549     );
1550   }
1551 
1552   /**
1553    * {@inheritDoc}
1554    *
1555    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getChildJobs(long)
1556    */
1557   @Override
1558   public List<Job> getChildJobs(long id) throws ServiceRegistryException {
1559     try {
1560       List<JpaJob> jobs = db.exec(namedQuery.findAll(
1561           "Job.root.children",
1562           JpaJob.class,
1563           Pair.of("id", id)
1564       ));
1565 
1566       if (jobs.size() == 0) {
1567         jobs = db.exec(getChildrenQuery(id));
1568       }
1569 
1570       return jobs.stream()
1571           .map(this::setJobUri)
1572           .map(JpaJob::toJob)
1573           .collect(Collectors.toList());
1574     } catch (Exception e) {
1575       throw new ServiceRegistryException(e);
1576     }
1577   }
1578 
1579   private Function<EntityManager, List<JpaJob>> getChildrenQuery(long id) {
1580     return em -> {
1581       TypedQuery<JpaJob> query = em
1582           .createNamedQuery("Job.children", JpaJob.class)
1583           .setParameter("id", id);
1584 
1585       List<JpaJob> childJobs = query.getResultList();
1586 
1587       List<JpaJob> result = new ArrayList<>(childJobs);
1588       childJobs.stream()
1589           .map(j -> getChildrenQuery(j.getId()).apply(em))
1590           .forEach(result::addAll);
1591 
1592       return result;
1593     };
1594   }
1595 
1596   /**
1597    * {@inheritDoc}
1598    *
1599    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getJobs(java.lang.String, Status)
1600    */
1601   @Override
1602   public List<Job> getJobs(String type, Status status) throws ServiceRegistryException {
1603     logger.trace("Getting jobs '{}' and '{}'", type, status);
1604 
1605     Function<EntityManager, List<JpaJob>> jobsQuery;
1606     if (type == null && status == null) {
1607       jobsQuery = namedQuery.findAll("Job.all", JpaJob.class);
1608     } else if (type == null) {
1609       jobsQuery = namedQuery.findAll(
1610           "Job.status",
1611           JpaJob.class,
1612           Pair.of("status", status.ordinal())
1613       );
1614     } else if (status == null) {
1615       jobsQuery = namedQuery.findAll(
1616           "Job.type",
1617           JpaJob.class,
1618           Pair.of("serviceType", type)
1619       );
1620     } else {
1621       jobsQuery = namedQuery.findAll(
1622           "Job",
1623           JpaJob.class,
1624           Pair.of("status", status.ordinal()),
1625           Pair.of("serviceType", type)
1626       );
1627     }
1628 
1629     try {
1630       return db.exec(jobsQuery).stream()
1631           .peek(this::setJobUri)
1632           .map(JpaJob::toJob)
1633           .collect(Collectors.toList());
1634     } catch (Exception e) {
1635       throw new ServiceRegistryException(e);
1636     }
1637   }
1638 
1639   @Override
1640   public List<String> getJobPayloads(String operation) throws ServiceRegistryException {
1641     try {
1642       return db.exec(namedQuery.findAll(
1643           "Job.payload",
1644           String.class,
1645           Pair.of("operation", operation)
1646       ));
1647     } catch (Exception e) {
1648       throw new ServiceRegistryException(e);
1649     }
1650   }
1651 
1652   @Override
1653   public List<String> getJobPayloads(String operation, int limit, int offset) throws ServiceRegistryException {
1654     try {
1655       return db.exec(em -> {
1656         return em.createNamedQuery("Job.payload", String.class)
1657             .setParameter("operation", operation)
1658             .setMaxResults(limit)
1659             .setFirstResult(offset)
1660             .getResultList();
1661       });
1662     } catch (Exception e) {
1663       throw new ServiceRegistryException(e);
1664     }
1665   }
1666 
1667   @Override
1668   public int getJobCount(final String operation) throws ServiceRegistryException {
1669     try {
1670       return db.exec(namedQuery.find(
1671           "Job.countByOperationOnly",
1672           Number.class,
1673           Pair.of("operation", operation)
1674       )).intValue();
1675     } catch (Exception e) {
1676       throw new ServiceRegistryException(e);
1677     }
1678   }
1679 
1680   /**
1681    * {@inheritDoc}
1682    *
1683    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getActiveJobs()
1684    */
1685   @Override
1686   public List<Job> getActiveJobs() throws ServiceRegistryException {
1687     try {
1688       return db.exec(getJobsByStatusQuery(activeJobStatus)).stream()
1689           .map(JpaJob::toJob)
1690           .collect(Collectors.toList());
1691     } catch (Exception e) {
1692       throw new ServiceRegistryException(e);
1693     }
1694   }
1695 
1696   /**
1697    * Get the list of jobs with status from the given statuses.
1698    *
1699    * @param statuses
1700    *          variable sized array of status values to test on jobs
1701    * @return list of jobs with status from statuses
1702    */
1703   public Function<EntityManager, List<JpaJob>> getJobsByStatusQuery(Status... statuses) {
1704     if (statuses == null || statuses.length < 1) {
1705       throw new IllegalArgumentException("At least one job status must be given.");
1706     }
1707 
1708     return namedQuery.findAll(
1709         "Job.statuses",
1710         JpaJob.class,
1711         Pair.of("statuses", Arrays.stream(statuses).map(Enum::ordinal).collect(Collectors.toList()))
1712     ).andThen(jobs -> jobs.stream()
1713         .peek(this::setJobUri)
1714         .collect(Collectors.toList()));
1715   }
1716 
1717   @Override
1718   public Map<String, Map<String, Long>> countActiveByOrganizationAndHost() {
1719     var rows = db.exec(namedQuery.findAll(
1720         "Job.countByOrganizationAndHost",
1721         Object[].class,
1722         Pair.of("statuses", Arrays.stream(activeJobStatus).map(Enum::ordinal).collect(Collectors.toList()))
1723     )).stream().collect(Collectors.toList());
1724     var orgMap = new HashMap<String, Map<String, Long>>();
1725     for (Object[] row: rows) {
1726       var org = (String) row[0];
1727       var host = (String) row[1];
1728       var count = (Long) row[2];
1729       orgMap.computeIfAbsent(org, o -> new HashMap<>()).put(host, count);
1730     }
1731     return orgMap;
1732   }
1733 
1734   @Override
1735   public Map<String, Long> countActiveTypeByOrganization(final String operation) {
1736     return db.exec(namedQuery.findAll(
1737         "Job.countTypeByOrganization",
1738         Object[].class,
1739         Pair.of("statuses", Arrays.stream(activeJobStatus).map(Enum::ordinal).collect(Collectors.toList())),
1740         Pair.of("operation", operation)
1741     )).stream().collect(Collectors.toMap(
1742         row -> (String) row[0],
1743         row -> (Long) row[1]
1744     ));
1745   }
1746 
1747   /**
1748    * Gets jobs of all types that are in the given state.
1749    *
1750    * @param offset apply offset to the db query if offset &gt; 0
1751    * @param limit apply limit to the db query if limit &gt; 0
1752    * @param statuses the job status should be one from the given statuses
1753    * @return the list of jobs waiting for dispatch
1754    */
1755   protected Function<EntityManager, List<JpaJob>> getDispatchableJobsWithStatusQuery(int offset, int limit,
1756       Status... statuses) {
1757     return em -> {
1758       if (statuses == null) {
1759         return Collections.emptyList();
1760       }
1761 
1762       TypedQuery<JpaJob> query = em
1763           .createNamedQuery("Job.dispatchable.status", JpaJob.class)
1764           .setParameter("statuses", Arrays.stream(statuses).map(Enum::ordinal).collect(Collectors.toList()));
1765       if (offset > 0) {
1766         query.setFirstResult(offset);
1767       }
1768       if (limit > 0) {
1769         query.setMaxResults(limit);
1770       }
1771       return query.getResultList();
1772     };
1773   }
1774 
1775   Function<EntityManager, List<Object[]>> getAvgOperationsQuery() {
1776     return namedQuery.findAll("Job.avgOperation", Object[].class);
1777   }
1778 
1779   Function<EntityManager, List<Object[]>> getCountPerHostServiceQuery() {
1780     return namedQuery.findAll("Job.countPerHostService", Object[].class);
1781   }
1782 
1783   /**
1784    * {@inheritDoc}
1785    *
1786    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#count(java.lang.String, Status)
1787    */
1788   @Override
1789   public long count(String serviceType, Status status) throws ServiceRegistryException {
1790     Function<EntityManager, Number> countQuery;
1791     if (serviceType == null && status == null) {
1792       countQuery = namedQuery.find(
1793           "Job.count.all",
1794           Number.class
1795       );
1796     } else if (serviceType == null) {
1797       countQuery = namedQuery.find(
1798           "Job.count.nullType",
1799           Number.class,
1800           Pair.of("status", status.ordinal())
1801       );
1802     } else if (status == null) {
1803       countQuery = namedQuery.find(
1804           "Job.count.nullStatus",
1805           Number.class,
1806           Pair.of("serviceType", serviceType)
1807       );
1808     } else {
1809       countQuery = namedQuery.find(
1810           "Job.count",
1811           Number.class,
1812           Pair.of("status", status.ordinal()),
1813           Pair.of("serviceType", serviceType)
1814       );
1815     }
1816 
1817     try {
1818       return db.exec(countQuery).longValue();
1819     } catch (Exception e) {
1820       throw new ServiceRegistryException(e);
1821     }
1822   }
1823 
1824   /**
1825    * {@inheritDoc}
1826    *
1827    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#countByHost(java.lang.String, java.lang.String,
1828    *      Status)
1829    */
1830   @Override
1831   public long countByHost(String serviceType, String host, Status status) throws ServiceRegistryException {
1832     Function<EntityManager, Number> countQuery;
1833     if (serviceType != null && !serviceType.isEmpty()) {
1834       countQuery = namedQuery.find(
1835           "Job.countByHost",
1836           Number.class,
1837           Pair.of("serviceType", serviceType),
1838           Pair.of("status", status.ordinal()),
1839           Pair.of("host", host)
1840       );
1841     } else {
1842       countQuery = namedQuery.find(
1843           "Job.countByHost.nullType",
1844           Number.class,
1845           Pair.of("status", status.ordinal()),
1846           Pair.of("host", host)
1847       );
1848     }
1849 
1850     try {
1851       return db.exec(countQuery).longValue();
1852     } catch (Exception e) {
1853       throw new ServiceRegistryException(e);
1854     }
1855   }
1856 
1857   /**
1858    * {@inheritDoc}
1859    *
1860    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#countByOperation(java.lang.String, java.lang.String,
1861    *      Status)
1862    */
1863 
1864   @Override
1865   public long countByOperation(String serviceType, String operation, Status status) throws ServiceRegistryException {
1866     try {
1867       return db.exec(namedQuery.find(
1868           "Job.countByOperation",
1869           Number.class,
1870           Pair.of("status", status.ordinal()),
1871           Pair.of("serviceType", serviceType),
1872           Pair.of("operation", operation)
1873       )).longValue();
1874     } catch (Exception e) {
1875       throw new ServiceRegistryException(e);
1876     }
1877   }
1878 
1879   /**
1880    * {@inheritDoc}
1881    *
1882    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#count(java.lang.String, java.lang.String,
1883    *      java.lang.String, Status)
1884    */
1885   @Override
1886   public long count(String serviceType, String host, String operation, Status status) throws ServiceRegistryException {
1887     if (StringUtils.isBlank(serviceType) || StringUtils.isBlank(host) || StringUtils.isBlank(operation)
1888             || status == null) {
1889       throw new IllegalArgumentException("service type, host, operation, and status must be provided");
1890     }
1891 
1892     try {
1893       return db.exec(namedQuery.find(
1894           "Job.fullMonty",
1895           Number.class,
1896           Pair.of("status", status.ordinal()),
1897           Pair.of("serviceType", serviceType),
1898           Pair.of("operation", operation)
1899       )).longValue();
1900     } catch (Exception e) {
1901       throw new ServiceRegistryException(e);
1902     }
1903   }
1904 
1905   /**
1906    * {@inheritDoc}
1907    *
1908    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceStatistics()
1909    */
1910   @Override
1911   public List<ServiceStatistics> getServiceStatistics() throws ServiceRegistryException {
1912     Date now = new Date();
1913     try {
1914       return db.exec(getServiceStatisticsQuery(
1915           DateUtils.addDays(now, -maxJobAge),
1916           DateUtils.addDays(now, 1) // Avoid glitches around 'now' by setting the endDate to 'tomorrow'
1917       ));
1918     } catch (Exception e) {
1919       throw new ServiceRegistryException(e);
1920     }
1921   }
1922 
1923   /**
1924    * Gets performance and runtime statistics for each known service registration.
1925    * For the statistics, only jobs created within the time interval [startDate, endDate] are being considered
1926    *
1927    * @param startDate
1928    *          Only jobs created after this data are considered for statistics
1929    * @param endDate
1930    *          Only jobs created before this data are considered for statistics
1931    * @return the service statistics
1932    */
1933   private Function<EntityManager, List<ServiceStatistics>> getServiceStatisticsQuery(Date startDate, Date endDate) {
1934     return em -> {
1935       Map<Long, JaxbServiceStatistics> statsMap = new HashMap<>();
1936 
1937       // Make sure we also include the services that have no processing history so far
1938       namedQuery.findAll("ServiceRegistration.getAll", ServiceRegistrationJpaImpl.class).apply(em).forEach(s ->
1939         statsMap.put(s.getId(), new JaxbServiceStatistics(s))
1940       );
1941 
1942       if (collectJobstats) {
1943         // Build stats map
1944         namedQuery.findAll(
1945             "ServiceRegistration.statistics",
1946             Object[].class,
1947             Pair.of("minDateCreated", startDate),
1948             Pair.of("maxDateCreated", endDate)
1949         ).apply(em).forEach(row -> {
1950           Number serviceRegistrationId = (Number) row[0];
1951           if (serviceRegistrationId == null || serviceRegistrationId.longValue() == 0) {
1952             return;
1953           }
1954           Status status = Status.values()[((Number) row[1]).intValue()];
1955           Number count = (Number) row[2];
1956           Number meanQueueTime = (Number) row[3];
1957           Number meanRunTime = (Number) row[4];
1958 
1959           // The statistics query returns a cartesian product, so we need to iterate over them to build up the objects
1960           JaxbServiceStatistics stats = statsMap.get(serviceRegistrationId.longValue());
1961           if (stats == null) {
1962             return;
1963           }
1964 
1965           // the status will be null if there are no jobs at all associated with this service registration
1966           if (status != null) {
1967             switch (status) {
1968               case RUNNING:
1969                 stats.setRunningJobs(count.intValue());
1970                 break;
1971               case QUEUED:
1972               case DISPATCHING:
1973                 stats.setQueuedJobs(count.intValue());
1974                 break;
1975               case FINISHED:
1976                 stats.setMeanRunTime(meanRunTime.longValue());
1977                 stats.setMeanQueueTime(meanQueueTime.longValue());
1978                 stats.setFinishedJobs(count.intValue());
1979                 break;
1980               default:
1981                 break;
1982             }
1983           }
1984         });
1985       }
1986 
1987       List<ServiceStatistics> stats = new ArrayList<>(statsMap.values());
1988       stats.sort((o1, o2) -> {
1989         ServiceRegistration reg1 = o1.getServiceRegistration();
1990         ServiceRegistration reg2 = o2.getServiceRegistration();
1991         int typeComparison = reg1.getServiceType().compareTo(reg2.getServiceType());
1992         return typeComparison == 0
1993             ? reg1.getHost().compareTo(reg2.getHost())
1994             : typeComparison;
1995       });
1996 
1997       return stats;
1998     };
1999   }
2000 
2001   /**
2002    * Do not look at this, it will burn your eyes! This is due to JPA's inability to do a left outer join with join
2003    * conditions.
2004    *
2005    * {@inheritDoc}
2006    *
2007    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrationsByLoad(java.lang.String)
2008    */
2009   @Override
2010   public List<ServiceRegistration> getServiceRegistrationsByLoad(String serviceType) throws ServiceRegistryException {
2011     SystemLoad loadByHost = getCurrentHostLoads();
2012     List<HostRegistration> hostRegistrations = getHostRegistrations();
2013     List<ServiceRegistration> serviceRegistrations = getServiceRegistrationsByType(serviceType);
2014     return getServiceRegistrationsByLoad(serviceType, serviceRegistrations, hostRegistrations, loadByHost);
2015   }
2016 
2017   /**
2018    * {@inheritDoc}
2019    *
2020    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getCurrentHostLoads()
2021    */
2022   @Override
2023   public SystemLoad getCurrentHostLoads() {
2024     return db.exec(getHostLoadsQuery());
2025   }
2026 
2027   /**
2028    * Gets a map of hosts to the number of jobs currently loading that host
2029    *
2030    * @return the map of hosts to job counts
2031    */
2032   Function<EntityManager, SystemLoad> getHostLoadsQuery() {
2033     return em -> {
2034       final SystemLoad systemLoad = new SystemLoad();
2035 
2036       // Find all jobs that are currently running on any given host, or get all of them
2037       List<Integer> statuses = JOB_STATUSES_INFLUENCING_LOAD_BALANCING.stream()
2038           .map(Enum::ordinal)
2039           .collect(Collectors.toList());
2040       List<Object[]> rows = namedQuery.findAll(
2041           "ServiceRegistration.hostloads",
2042           Object[].class,
2043           Pair.of("statuses", statuses),
2044           // Note: This is used in the query to filter out workflow jobs.
2045           // These jobs are load balanced by the workflow service directly.
2046           Pair.of("workflow_type", TYPE_WORKFLOW)
2047       ).apply(em);
2048 
2049       // Accumulate the numbers for relevant job statuses per host
2050       for (Object[] row : rows) {
2051         String host = String.valueOf(row[0]);
2052         Status status = Status.values()[(int) row[1]];
2053         float currentLoad = ((Number) row[2]).floatValue();
2054         float maxLoad = ((Number) row[3]).floatValue();
2055 
2056         // Only queued, and running jobs are adding to the load, so every other status is discarded
2057         if (status == null || !JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(status)) {
2058           currentLoad = 0.0f;
2059         }
2060         // Add the service registration
2061         NodeLoad serviceLoad = new NodeLoad(host, currentLoad, maxLoad);
2062         systemLoad.addNodeLoad(serviceLoad);
2063       }
2064 
2065       // This is important, otherwise services which have no current load are not listed in the output!
2066       getHostRegistrationsQuery().apply(em).stream()
2067           .filter(h -> !systemLoad.containsHost(h.getBaseUrl()))
2068           .forEach(h -> systemLoad.addNodeLoad(new NodeLoad(h.getBaseUrl(), 0.0f, h.getMaxLoad())));
2069       return systemLoad;
2070     };
2071   }
2072 
2073   /**
2074    * {@inheritDoc}
2075    *
2076    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrationsByType(java.lang.String)
2077    */
2078   @Override
2079   public List<ServiceRegistration> getServiceRegistrationsByType(String serviceType) throws ServiceRegistryException {
2080     return db.exec(namedQuery.findAll(
2081         "ServiceRegistration.getByType",
2082         ServiceRegistration.class,
2083         Pair.of("serviceType", serviceType)
2084     ));
2085   }
2086 
2087   /**
2088    * {@inheritDoc}
2089    *
2090    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrationsByHost(java.lang.String)
2091    */
2092   @Override
2093   public List<ServiceRegistration> getServiceRegistrationsByHost(String host) throws ServiceRegistryException {
2094     return db.exec(getServiceRegistrationsByHostQuery(host));
2095   }
2096 
2097   private Function<EntityManager, List<ServiceRegistration>> getServiceRegistrationsByHostQuery(String host) {
2098     return namedQuery.findAll(
2099         "ServiceRegistration.getByHost",
2100         ServiceRegistration.class,
2101         Pair.of("host", host)
2102     );
2103   }
2104 
2105   /**
2106    * {@inheritDoc}
2107    *
2108    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistration(java.lang.String,
2109    *      java.lang.String)
2110    */
2111   @Override
2112   public ServiceRegistration getServiceRegistration(String serviceType, String host) {
2113     return db.exec(getServiceRegistrationQuery(serviceType, host))
2114         .orElse(null);
2115   }
2116 
2117   /**
2118    * Sets the trusted http client.
2119    *
2120    * @param client
2121    *          the trusted http client
2122    */
2123   @Reference
2124   void setTrustedHttpClient(TrustedHttpClient client) {
2125     this.client = client;
2126   }
2127 
2128   /**
2129    * Callback for setting the security service.
2130    *
2131    * @param securityService
2132    *          the securityService to set
2133    */
2134   @Reference
2135   public void setSecurityService(SecurityService securityService) {
2136     this.securityService = securityService;
2137   }
2138 
2139   /** OSGi DI. */
2140   @Reference(
2141       cardinality = ReferenceCardinality.OPTIONAL,
2142       policy =  ReferencePolicy.DYNAMIC,
2143       unbind = "unsetIncidentService"
2144   )
2145   public void setIncidentService(IncidentService incidentService) {
2146     this.incidentService = incidentService;
2147     // Manually resolve the cyclic dependency between the incident service and the service registry
2148     ((OsgiIncidentService) incidentService).setServiceRegistry(this);
2149     this.incidents = new Incidents(this, incidentService);
2150   }
2151 
2152   public void unsetIncidentService(IncidentService incidentService) {
2153     if (this.incidentService == incidentService) {
2154       this.incidentService = null;
2155       this.incidents = null;
2156     }
2157   }
2158 
2159   /**
2160    * Update the jobs failure history and the service status with the given information. All these data are then use for
2161    * the jobs failover strategy. Only the terminated job (with FAILED or FINISHED status) are taken into account.
2162    *
2163    * @param job
2164    *          the current job that failed/succeeded
2165    * @throws ServiceRegistryException
2166    * @throws NotFoundException
2167    */
2168   private void updateServiceForFailover(JpaJob job) throws ServiceRegistryException, NotFoundException {
2169     if (job.getStatus() != Status.FAILED && job.getStatus() != Status.FINISHED) {
2170       return;
2171     }
2172 
2173     job.setStatus(job.getStatus(), job.getFailureReason());
2174 
2175     // At this point, the only possible states for the current service are NORMAL and WARNING,
2176     // the services in ERROR state will not be chosen by the dispatcher
2177     ServiceRegistrationJpaImpl currentService = job.getProcessorServiceRegistration();
2178     if (currentService == null) {
2179       return;
2180     }
2181 
2182     // Job is finished with a failure
2183     if (job.getStatus() == FAILED && !DATA.equals(job.getFailureReason())) {
2184 
2185       // Services in WARNING or ERROR state triggered by current job
2186       List<ServiceRegistrationJpaImpl> relatedWarningOrErrorServices = getRelatedWarningErrorServices(job);
2187 
2188       // Before this job failed there was at least one job failed with this job signature on any service
2189       if (relatedWarningOrErrorServices.size() > 0) {
2190         for (ServiceRegistrationJpaImpl relatedService : relatedWarningOrErrorServices) {
2191           // Skip current service from the list
2192           if (currentService.equals(relatedService)) {
2193             continue;
2194           }
2195 
2196           // De-escalate the state of related services as the issue is most likely with the job not the service
2197           // Reset the WARNING job to NORMAL
2198           if (relatedService.getServiceState() == WARNING) {
2199             logger.info("State reset to NORMAL for related service {} on host {}", relatedService.getServiceType(),
2200                     relatedService.getHost());
2201             relatedService.setServiceState(NORMAL, job.toJob().getSignature());
2202           }
2203 
2204           // Reset the ERROR job to WARNING
2205           else if (relatedService.getServiceState() == ERROR) {
2206             logger.info("State reset to WARNING for related service {} on host {}", relatedService.getServiceType(),
2207                     relatedService.getHost());
2208             relatedService.setServiceState(WARNING, relatedService.getWarningStateTrigger());
2209           }
2210 
2211           updateServiceState(relatedService);
2212         }
2213       }
2214 
2215       // This is the first job with this signature failing on any service
2216       else {
2217         // Set the current service to WARNING state
2218         if (currentService.getServiceState() == NORMAL) {
2219           logger.info("State set to WARNING for current service {} on host {}", currentService.getServiceType(),
2220                   currentService.getHost());
2221           currentService.setServiceState(WARNING, job.toJob().getSignature());
2222           updateServiceState(currentService);
2223         }
2224 
2225         // The current service already is in WARNING state and max attempts is reached
2226         else if (errorStatesEnabled && !noErrorStateServiceTypes.contains(currentService.getServiceType())
2227                 && getHistorySize(currentService) >= maxAttemptsBeforeErrorState) {
2228           logger.info("State set to ERROR for current service {} on host {}", currentService.getServiceType(),
2229                   currentService.getHost());
2230           currentService.setServiceState(ERROR, job.toJob().getSignature());
2231           updateServiceState(currentService);
2232         }
2233       }
2234     }
2235 
2236     // Job is finished without failure
2237     else if (job.getStatus() == Status.FINISHED) {
2238       // If the service was in warning state reset to normal state
2239       if (currentService.getServiceState() == WARNING) {
2240         logger.info("State reset to NORMAL for current service {} on host {}", currentService.getServiceType(),
2241                 currentService.getHost());
2242         currentService.setServiceState(NORMAL);
2243         updateServiceState(currentService);
2244       }
2245     }
2246   }
2247 
2248   /**
2249    * {@inheritDoc}
2250    *
2251    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#sanitize(java.lang.String, java.lang.String)
2252    */
2253   @Override
2254   public void sanitize(String serviceType, String host) throws NotFoundException {
2255     db.execChecked(em -> {
2256       ServiceRegistrationJpaImpl service = getServiceRegistrationQuery(serviceType, host).apply(em)
2257           .orElseThrow(NotFoundException::new);
2258 
2259       logger.info("State reset to NORMAL for service {} on host {} through sanitize method", service.getServiceType(),
2260           service.getHost());
2261       service.setServiceState(NORMAL);
2262       updateServiceState(service);
2263     });
2264   }
2265 
2266   /**
2267    * Gets the failed jobs history for the given service registration
2268    *
2269    * @param serviceRegistration
2270    * @return the failed jobs history size
2271    * @throws IllegalArgumentException
2272    *           if parameter is null
2273    * @throws ServiceRegistryException
2274    */
2275   private int getHistorySize(ServiceRegistration serviceRegistration) throws ServiceRegistryException {
2276     if (serviceRegistration == null) {
2277       throw new IllegalArgumentException("serviceRegistration must not be null!");
2278     }
2279 
2280     logger.debug("Calculating count of jobs who failed due to service {}", serviceRegistration);
2281 
2282     try {
2283       return db.exec(namedQuery.find(
2284           "Job.count.history.failed",
2285           Number.class,
2286           Pair.of("serviceType", serviceRegistration.getServiceType()),
2287           Pair.of("host", serviceRegistration.getHost())
2288       )).intValue();
2289     } catch (Exception e) {
2290       throw new ServiceRegistryException(e);
2291     }
2292   }
2293 
2294   /**
2295    * Gets the services in WARNING or ERROR state triggered by this job
2296    *
2297    * @param job
2298    *          the given job to get the related services
2299    * @return a list of services triggered by the job
2300    * @throws IllegalArgumentException
2301    *           if the given job was null
2302    * @throws ServiceRegistryException
2303    *           if the there was a problem with the query
2304    */
2305   private List<ServiceRegistrationJpaImpl> getRelatedWarningErrorServices(JpaJob job) throws ServiceRegistryException {
2306     if (job == null) {
2307       throw new IllegalArgumentException("job must not be null!");
2308     }
2309 
2310     logger.debug("Try to get the services in WARNING or ERROR state triggered by {} failed", job);
2311 
2312     try {
2313       return db.exec(namedQuery.findAll(
2314           "ServiceRegistration.relatedservices.warning_error",
2315           ServiceRegistrationJpaImpl.class,
2316           Pair.of("serviceType", job.getJobType())
2317       )).stream()
2318           // TODO: modify the query to avoid to go through the list here
2319           .filter(rs ->
2320               (rs.getServiceState() == WARNING && rs.getWarningStateTrigger() == job.toJob().getSignature())
2321               || (rs.getServiceState() == ERROR && rs.getErrorStateTrigger() == job.toJob().getSignature())
2322           ).collect(Collectors.toList());
2323     } catch (Exception e) {
2324       throw new ServiceRegistryException(e);
2325     }
2326   }
2327 
2328   /**
2329    * Returns a filtered list of service registrations, containing only those that are online, not in maintenance mode,
2330    * and with a specific service type that are running on a host which is not already maxed out.
2331    *
2332    * @param serviceRegistrations
2333    *          the complete list of service registrations
2334    * @param hostRegistrations
2335    *          the complete list of available host registrations
2336    * @param systemLoad
2337    *          the map of hosts to the number of running jobs
2338    * @param jobType
2339    *          the job type for which the services registrations are filtered
2340    */
2341   protected List<ServiceRegistration> getServiceRegistrationsWithCapacity(String jobType,
2342           List<ServiceRegistration> serviceRegistrations, List<HostRegistration> hostRegistrations,
2343           final SystemLoad systemLoad) {
2344     final List<String> hostBaseUrls = hostRegistrations.stream()
2345                                                        .map(HostRegistration::getBaseUrl)
2346                                                        .collect(Collectors.toUnmodifiableList());
2347     final List<ServiceRegistration> filteredList = new ArrayList<>();
2348 
2349     for (ServiceRegistration service : serviceRegistrations) {
2350       // Skip service if host not available
2351       if (!hostBaseUrls.contains(service.getHost())) {
2352         logger.trace("Not considering {} because it's host {} is not available for dispatching", service,
2353                 service.getHost());
2354         continue;
2355       }
2356 
2357       // Skip services that are not of the requested type
2358       if (!jobType.equals(service.getServiceType())) {
2359         logger.trace("Not considering {} because it is of the wrong job type", service);
2360         continue;
2361       }
2362 
2363       // Skip services that are in error state
2364       if (service.getServiceState() == ERROR) {
2365         logger.trace("Not considering {} because it is in error state", service);
2366         continue;
2367       }
2368 
2369       // Skip services that are in maintenance mode
2370       if (service.isInMaintenanceMode()) {
2371         logger.trace("Not considering {} because it is in maintenance mode", service);
2372         continue;
2373       }
2374 
2375       // Skip services that are marked as offline
2376       if (!service.isOnline()) {
2377         logger.trace("Not considering {} because it is currently offline", service);
2378         continue;
2379       }
2380 
2381       // Determine the maximum load for this host
2382       Float hostLoadMax = null;
2383       for (HostRegistration host : hostRegistrations) {
2384         if (host.getBaseUrl().equals(service.getHost())) {
2385           hostLoadMax = host.getMaxLoad();
2386           break;
2387         }
2388       }
2389       if (hostLoadMax == null) {
2390         logger.warn("Unable to determine max load for host {}", service.getHost());
2391       }
2392 
2393       // Determine the current load for this host
2394       Float hostLoad = systemLoad.get(service.getHost()).getLoadFactor();
2395       if (hostLoad == null) {
2396         logger.warn("Unable to determine current load for host {}", service.getHost());
2397       }
2398 
2399       // Is this host suited for processing?
2400       if (hostLoad == null || hostLoadMax == null || hostLoad < hostLoadMax) {
2401         logger.debug("Adding candidate service {} for processing of jobs of type '{}' (host load is {} of max {})",
2402            service, jobType, hostLoad, hostLoadMax);
2403         filteredList.add(service);
2404       }
2405     }
2406 
2407     // Sort the list by capacity
2408     filteredList.sort(new LoadComparator(systemLoad));
2409 
2410     return filteredList;
2411   }
2412 
2413   /**
2414    * Returns a filtered list of service registrations, containing only those that are online, not in maintenance mode,
2415    * and with a specific service type, ordered by load.
2416    *
2417    * @param jobType
2418    *          the job type for which the services registrations are filtered
2419    * @param serviceRegistrations
2420    *          the complete list of service registrations
2421    * @param hostRegistrations
2422    *          the complete list of available host registrations
2423    * @param systemLoad
2424    *
2425    */
2426   protected List<ServiceRegistration> getServiceRegistrationsByLoad(String jobType,
2427           List<ServiceRegistration> serviceRegistrations, List<HostRegistration> hostRegistrations,
2428           final SystemLoad systemLoad) {
2429     final List<String> hostBaseUrls = hostRegistrations.stream()
2430                                                        .map(HostRegistration::getBaseUrl)
2431                                                        .collect(Collectors.toUnmodifiableList());
2432     final List<ServiceRegistration> filteredList = new ArrayList<>();
2433 
2434     logger.debug("Finding services to dispatch job of type {}", jobType);
2435 
2436     for (ServiceRegistration service : serviceRegistrations) {
2437       // Skip service if host not available
2438       if (!hostBaseUrls.contains(service.getHost())) {
2439         logger.trace("Not considering {} because it's host {} is not available for dispatching", service,
2440                 service.getHost());
2441         continue;
2442       }
2443 
2444       // Skip services that are not of the requested type
2445       if (!jobType.equals(service.getServiceType())) {
2446         logger.trace("Not considering {} because it is of the wrong job type", service);
2447         continue;
2448       }
2449 
2450       // Skip services that are in error state
2451       if (service.getServiceState() == ERROR) {
2452         logger.trace("Not considering {} because it is in error state", service);
2453         continue;
2454       }
2455 
2456       // Skip services that are in maintenance mode
2457       if (service.isInMaintenanceMode()) {
2458         logger.trace("Not considering {} because it is in maintenance mode", service);
2459         continue;
2460       }
2461 
2462       // Skip services that are marked as offline
2463       if (!service.isOnline()) {
2464         logger.trace("Not considering {} because it is currently offline", service);
2465         continue;
2466       }
2467 
2468       // We found a candidate service
2469       logger.debug("Adding candidate service {} for processing of job of type '{}'", service, jobType);
2470       filteredList.add(service);
2471     }
2472 
2473     // Sort the list by capacity and distinguish between composer jobs and other jobs
2474     if ("org.opencastproject.composer".equals(jobType)) {
2475       Collections.sort(filteredList, new LoadComparatorEncoding(systemLoad));
2476     } else {
2477       Collections.sort(filteredList, new LoadComparator(systemLoad));
2478     }
2479 
2480     return filteredList;
2481   }
2482 
2483   /**
2484    * {@inheritDoc}
2485    *
2486    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getMaxLoads()
2487    */
2488   @Override
2489   public SystemLoad getMaxLoads() throws ServiceRegistryException {
2490     final SystemLoad loads = new SystemLoad();
2491     getHostRegistrations().stream()
2492         .map(host -> new NodeLoad(host.getBaseUrl(), 0.0f, host.getMaxLoad()))
2493         .forEach(loads::addNodeLoad);
2494     return loads;
2495   }
2496 
2497   /**
2498    * {@inheritDoc}
2499    *
2500    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getMaxLoadOnNode(java.lang.String)
2501    */
2502   @Override
2503   public NodeLoad getMaxLoadOnNode(String host) throws ServiceRegistryException, NotFoundException {
2504     try {
2505       float maxLoad = db.exec(namedQuery.find(
2506           "HostRegistration.getMaxLoadByHostName",
2507           Number.class,
2508           Pair.of("host", host)
2509       )).floatValue();
2510       return new NodeLoad(host, 0.0f, maxLoad);
2511     } catch (NoResultException e) {
2512       throw new NotFoundException(e);
2513     } catch (Exception e) {
2514       throw new ServiceRegistryException(e);
2515     }
2516   }
2517 
2518   /** A periodic check on each service registration to ensure that it is still alive. */
2519   class JobProducerHeartbeat implements Runnable {
2520 
2521     /** List of service registrations that have been found unresponsive last time we checked */
2522     private final List<ServiceRegistration> unresponsive = new ArrayList<>();
2523 
2524     /**
2525      * {@inheritDoc}
2526      *
2527      * @see java.lang.Runnable#run()
2528      */
2529     @Override
2530     public void run() {
2531       logger.debug("Checking for unresponsive services");
2532 
2533       try {
2534         List<ServiceRegistration> serviceRegistrations = getOnlineServiceRegistrations();
2535 
2536         for (ServiceRegistration service : serviceRegistrations) {
2537           hostsStatistics.updateHost(((ServiceRegistrationJpaImpl) service).getHostRegistration());
2538           servicesStatistics.updateService(service);
2539           if (!service.isJobProducer()) {
2540             continue;
2541           }
2542           if (service.isInMaintenanceMode()) {
2543             continue;
2544           }
2545 
2546           // We think this service is online and available. Prove it.
2547           String serviceUrl = UrlSupport.concat(service.getHost(), service.getPath(), "dispatch");
2548 
2549           HttpHead options = new HttpHead(serviceUrl);
2550           HttpResponse response = null;
2551           try {
2552             try {
2553               response = client.execute(options);
2554               if (response != null) {
2555                 switch (response.getStatusLine().getStatusCode()) {
2556                   case HttpStatus.SC_OK:
2557                     // this service is reachable, continue checking other services
2558                     logger.trace("Service " + service + " is responsive: " + response.getStatusLine());
2559                     if (unresponsive.remove(service)) {
2560                       logger.info("Service {} is still online", service);
2561                     } else if (!service.isOnline()) {
2562                       try {
2563                         setOnlineStatus(service.getServiceType(), service.getHost(), service.getPath(), true, true);
2564                         logger.info("Service {} is back online", service);
2565                       } catch (ServiceRegistryException e) {
2566                         logger.warn("Error setting online status for {}", service);
2567                       }
2568                     }
2569                     continue;
2570                   default:
2571                     if (!service.isOnline()) {
2572                       continue;
2573                     }
2574                     logger.warn("Service {} is not working as expected: {}", service, response.getStatusLine());
2575                 }
2576               } else {
2577                 logger.warn("Service {} does not respond", service);
2578               }
2579             } catch (TrustedHttpClientException e) {
2580               if (!service.isOnline()) {
2581                 continue;
2582               }
2583               logger.warn("Unable to reach {}", service, e);
2584             }
2585 
2586             // If we get here, the service did not respond as expected
2587             try {
2588               if (unresponsive.contains(service)) {
2589                 unRegisterService(service.getServiceType(), service.getHost());
2590                 unresponsive.remove(service);
2591                 logger.warn("Marking {} as offline", service);
2592               } else {
2593                 unresponsive.add(service);
2594                 logger.warn("Added {} to the watch list", service);
2595               }
2596             } catch (ServiceRegistryException e) {
2597               logger.warn("Unable to unregister unreachable service: {}", service, e);
2598             }
2599           } finally {
2600             client.close(response);
2601           }
2602         }
2603       } catch (Throwable t) {
2604         logger.warn("Error while checking for unresponsive services", t);
2605       }
2606 
2607       logger.debug("Finished checking for unresponsive services");
2608     }
2609   }
2610 
2611   /**
2612    * Comparator that will sort service registrations depending on their capacity, wich is defined by the number of jobs
2613    * the service's host is already running divided by the MaxLoad of the Server. The lower that number, the bigger
2614    * the capacity.
2615    */
2616   private class LoadComparator implements Comparator<ServiceRegistration> {
2617 
2618     protected SystemLoad loadByHost = null;
2619 
2620     /**
2621      * Creates a new comparator which is using the given map of host names and loads.
2622      *
2623      * @param loadByHost
2624      *          the current work load by host
2625      */
2626     LoadComparator(SystemLoad loadByHost) {
2627       this.loadByHost = loadByHost;
2628     }
2629 
2630     @Override
2631     public int compare(ServiceRegistration serviceA, ServiceRegistration serviceB) {
2632       String hostA = serviceA.getHost();
2633       String hostB = serviceB.getHost();
2634       NodeLoad nodeA = loadByHost.get(hostA);
2635       NodeLoad nodeB = loadByHost.get(hostB);
2636       // If the load factors are about the same, sort based on maximum load
2637       if (Math.abs(nodeA.getLoadFactor() - nodeB.getLoadFactor()) <= 0.01) {
2638         // NOTE: The sort order below is *reversed* from what you'd expect
2639         // When we're comparing the load factors we want the node with the lowest factor to be first
2640         // When we're comparing the maximum load value, we want the node with the highest max to be first
2641         return Float.compare(nodeB.getMaxLoad(), nodeA.getMaxLoad());
2642       }
2643       return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2644     }
2645   }
2646 
2647   /**
2648    * Comparator that will sort service registrations depending on their capacity, which is defined by the number of jobs
2649    * the service's host is already running divided by the MaxLoad of the Server. The lower that number, the bigger
2650    * the capacity.
2651    * This Comparator will prefer encoding workers, if none are defined in the configuration file it will act like
2652    * the LoadComparator.
2653    */
2654   private class LoadComparatorEncoding extends LoadComparator implements Comparator<ServiceRegistration> {
2655 
2656     /**
2657      * Creates a new comparator which is using the given map of host names and loads.
2658      *
2659      * @param loadByHost
2660      */
2661     LoadComparatorEncoding(SystemLoad loadByHost) {
2662       super(loadByHost);
2663     }
2664 
2665     @Override
2666     public int compare(ServiceRegistration serviceA, ServiceRegistration serviceB) {
2667       String hostA = serviceA.getHost();
2668       String hostB = serviceB.getHost();
2669       NodeLoad nodeA = loadByHost.get(hostA);
2670       NodeLoad nodeB = loadByHost.get(hostB);
2671 
2672       if (encodingWorkers != null) {
2673         if (encodingWorkers.contains(hostA) && !encodingWorkers.contains(hostB)) {
2674           if (nodeA.getLoadFactor() <= encodingThreshold) {
2675             return -1;
2676           }
2677           return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2678         }
2679         if (encodingWorkers.contains(hostB) && !encodingWorkers.contains(hostA)) {
2680           if (nodeB.getLoadFactor() <= encodingThreshold) {
2681             return 1;
2682           }
2683           return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2684         }
2685       }
2686         return super.compare(serviceA, serviceB);
2687     }
2688   }
2689 }