1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.serviceregistry.impl;
23
24 import static java.lang.String.format;
25 import static org.apache.commons.lang3.StringUtils.isBlank;
26 import static org.opencastproject.db.Queries.namedQuery;
27 import static org.opencastproject.job.api.AbstractJobProducer.ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY;
28 import static org.opencastproject.job.api.AbstractJobProducer.DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING;
29 import static org.opencastproject.job.api.Job.FailureReason.DATA;
30 import static org.opencastproject.job.api.Job.Status.FAILED;
31 import static org.opencastproject.serviceregistry.api.ServiceState.ERROR;
32 import static org.opencastproject.serviceregistry.api.ServiceState.NORMAL;
33 import static org.opencastproject.serviceregistry.api.ServiceState.WARNING;
34 import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
35
36 import org.opencastproject.db.DBSession;
37 import org.opencastproject.db.DBSessionFactory;
38 import org.opencastproject.job.api.Job;
39 import org.opencastproject.job.api.Job.Status;
40 import org.opencastproject.job.jpa.JpaJob;
41 import org.opencastproject.security.api.Organization;
42 import org.opencastproject.security.api.SecurityService;
43 import org.opencastproject.security.api.TrustedHttpClient;
44 import org.opencastproject.security.api.TrustedHttpClientException;
45 import org.opencastproject.security.api.User;
46 import org.opencastproject.serviceregistry.api.HostRegistration;
47 import org.opencastproject.serviceregistry.api.HostStatistics;
48 import org.opencastproject.serviceregistry.api.IncidentService;
49 import org.opencastproject.serviceregistry.api.Incidents;
50 import org.opencastproject.serviceregistry.api.JaxbServiceStatistics;
51 import org.opencastproject.serviceregistry.api.ServiceRegistration;
52 import org.opencastproject.serviceregistry.api.ServiceRegistry;
53 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
54 import org.opencastproject.serviceregistry.api.ServiceStatistics;
55 import org.opencastproject.serviceregistry.api.SystemLoad;
56 import org.opencastproject.serviceregistry.api.SystemLoad.NodeLoad;
57 import org.opencastproject.serviceregistry.impl.jmx.HostsStatistics;
58 import org.opencastproject.serviceregistry.impl.jmx.JobsStatistics;
59 import org.opencastproject.serviceregistry.impl.jmx.ServicesStatistics;
60 import org.opencastproject.serviceregistry.impl.jpa.HostRegistrationJpaImpl;
61 import org.opencastproject.serviceregistry.impl.jpa.ServiceRegistrationJpaImpl;
62 import org.opencastproject.systems.OpencastConstants;
63 import org.opencastproject.util.NotFoundException;
64 import org.opencastproject.util.UrlSupport;
65 import org.opencastproject.util.function.ThrowingConsumer;
66 import org.opencastproject.util.jmx.JmxUtil;
67
68 import org.apache.commons.lang3.StringUtils;
69 import org.apache.commons.lang3.time.DateUtils;
70 import org.apache.commons.lang3.tuple.Pair;
71 import org.apache.http.HttpResponse;
72 import org.apache.http.HttpStatus;
73 import org.apache.http.client.methods.HttpHead;
74 import org.osgi.service.cm.ConfigurationException;
75 import org.osgi.service.cm.ManagedService;
76 import org.osgi.service.component.ComponentContext;
77 import org.osgi.service.component.annotations.Activate;
78 import org.osgi.service.component.annotations.Component;
79 import org.osgi.service.component.annotations.Deactivate;
80 import org.osgi.service.component.annotations.Modified;
81 import org.osgi.service.component.annotations.Reference;
82 import org.osgi.service.component.annotations.ReferenceCardinality;
83 import org.osgi.service.component.annotations.ReferencePolicy;
84 import org.slf4j.Logger;
85 import org.slf4j.LoggerFactory;
86
87 import java.net.InetAddress;
88 import java.net.URI;
89 import java.net.URISyntaxException;
90 import java.util.ArrayList;
91 import java.util.Arrays;
92 import java.util.Collections;
93 import java.util.Comparator;
94 import java.util.Date;
95 import java.util.Dictionary;
96 import java.util.HashMap;
97 import java.util.List;
98 import java.util.Map;
99 import java.util.Objects;
100 import java.util.Optional;
101 import java.util.concurrent.Executors;
102 import java.util.concurrent.ScheduledExecutorService;
103 import java.util.concurrent.TimeUnit;
104 import java.util.concurrent.atomic.AtomicReference;
105 import java.util.function.Consumer;
106 import java.util.function.Function;
107 import java.util.stream.Collectors;
108
109 import javax.management.ObjectInstance;
110 import javax.persistence.EntityManager;
111 import javax.persistence.EntityManagerFactory;
112 import javax.persistence.LockModeType;
113 import javax.persistence.NoResultException;
114 import javax.persistence.TypedQuery;
115
116
117 @Component(
118 property = {
119 "service.description=Service registry"
120 },
121 immediate = true,
122 service = { ManagedService.class, ServiceRegistry.class, ServiceRegistryJpaImpl.class }
123 )
124 public class ServiceRegistryJpaImpl implements ServiceRegistry, ManagedService {
125
126
127 public static final String PERSISTENCE_UNIT = "org.opencastproject.common";
128
129
130
131 public static final String START_OPERATION = "START_OPERATION";
132
133
134
135 public static final String START_WORKFLOW = "START_WORKFLOW";
136
137
138 public static final String RESUME = "RESUME";
139
140
141 public static final String TYPE_WORKFLOW = "org.opencastproject.workflow";
142
143 static final Logger logger = LoggerFactory.getLogger(ServiceRegistryJpaImpl.class);
144
145
146 protected List<ObjectInstance> jmxBeans = new ArrayList<>();
147
148
149 private static final String JMX_HOSTS_STATISTICS_TYPE = "HostsStatistics";
150
151
152 private static final String JMX_SERVICES_STATISTICS_TYPE = "ServicesStatistics";
153
154
155 private static final String JMX_JOBS_STATISTICS_TYPE = "JobsStatistics";
156
157
158 private HostsStatistics hostsStatistics;
159
160
161 private ServicesStatistics servicesStatistics;
162
163
164 private JobsStatistics jobsStatistics;
165
166
167 private static final ThreadLocal<Job> currentJob = new ThreadLocal<>();
168
169
170 protected static final String OPT_MAXLOAD = "org.opencastproject.server.maxload";
171
172
173
174 protected static final String OPT_HEARTBEATINTERVAL = "heartbeat.interval";
175
176
177 protected static final String OPT_JOBSTATISTICS = "jobstats.collect";
178
179
180
181 protected static final String OPT_SERVICE_STATISTICS_MAX_JOB_AGE =
182 "org.opencastproject.statistics.services.max_job_age";
183
184
185 protected static final String OPT_ENCODING_WORKERS = "org.opencastproject.encoding.workers";
186
187
188 protected static final String OPT_ENCODING_THRESHOLD = "org.opencastproject.encoding.workers.threshold";
189
190
191 protected TrustedHttpClient client = null;
192
193
194
195 static final int DEFAULT_DISPATCH_JOBS_LIMIT = 100;
196
197
198 static final boolean DEFAULT_JOB_STATISTICS = false;
199
200
201 static final int DEFAULT_SERVICE_STATISTICS_MAX_JOB_AGE = 14;
202
203 static final List<String> DEFAULT_ENCODING_WORKERS = new ArrayList<String>();
204
205 static final double DEFAULT_ENCODING_THRESHOLD = 0.0;
206
207
208 static final String MAX_ATTEMPTS_CONFIG_KEY = "max.attempts";
209
210
211 static final String NO_ERROR_STATE_SERVICE_TYPES_CONFIG_KEY = "no.error.state.service.types";
212
213
214 private static final int DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE = 10;
215
216
217 private static final boolean DEFAULT_ERROR_STATES_ENABLED = true;
218
219
220 protected int maxAttemptsBeforeErrorState = DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE;
221 private boolean errorStatesEnabled = DEFAULT_ERROR_STATES_ENABLED;
222
223
224 private List<String> noErrorStateServiceTypes = new ArrayList<>();
225
226
227 static final long DEFAULT_HEART_BEAT = 60;
228
229
230 static final float DEFAULT_JOB_LOAD = 0.1f;
231
232
233 protected String hostName;
234
235
236 protected String nodeName;
237
238
239 protected String jobHost;
240
241
242 protected static List<String> encodingWorkers = DEFAULT_ENCODING_WORKERS;
243
244
245 protected static double encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
246
247
248 protected EntityManagerFactory emf = null;
249
250 protected DBSessionFactory dbSessionFactory;
251
252 protected DBSession db;
253
254
255 protected ScheduledExecutorService scheduledExecutor = null;
256
257
258 protected SecurityService securityService = null;
259
260 protected IncidentService incidentService = null;
261
262 protected Incidents incidents;
263
264
265 protected boolean collectJobstats = DEFAULT_JOB_STATISTICS;
266
267
268 protected int maxJobAge = DEFAULT_SERVICE_STATISTICS_MAX_JOB_AGE;
269
270
271 protected static final List<Status> JOB_STATUSES_INFLUENCING_LOAD_BALANCING;
272
273 private static final Status[] activeJobStatus =
274 Arrays.stream(Status.values()).filter(Status::isActive).collect(Collectors.toList()).toArray(new Status[0]);
275
276 protected static HashMap<Long, Float> jobCache = new HashMap<>();
277
278 static {
279 JOB_STATUSES_INFLUENCING_LOAD_BALANCING = new ArrayList<>();
280 JOB_STATUSES_INFLUENCING_LOAD_BALANCING.add(Status.RUNNING);
281 }
282
283
284 protected Boolean acceptJobLoadsExeedingMaxLoad = true;
285
286
287 protected float localSystemLoad = 0.0f;
288
289
290 @Reference(target = "(osgi.unit.name=org.opencastproject.common)")
291 void setEntityManagerFactory(EntityManagerFactory emf) {
292 this.emf = emf;
293 }
294
295 @Reference
296 public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
297 this.dbSessionFactory = dbSessionFactory;
298 }
299
300 @Activate
301 public void activate(ComponentContext cc) {
302 logger.info("Activate service registry");
303
304 db = dbSessionFactory.createSession(emf);
305
306
307 if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty(OpencastConstants.SERVER_URL_PROPERTY))) {
308 hostName = UrlSupport.DEFAULT_BASE_URL;
309 } else {
310 hostName = cc.getBundleContext().getProperty(OpencastConstants.SERVER_URL_PROPERTY);
311 }
312
313
314 if (hostName.endsWith("/")) {
315 logger.warn("The configured value of {} ends with '/'. This is very likely a configuration error which could "
316 + "lead to services not working properly. Note that this configuration should not contain any part of "
317 + "the service paths.", OpencastConstants.SERVER_URL_PROPERTY);
318 }
319
320
321 cleanUndispatchableJobs(hostName);
322
323
324 try {
325 List<ServiceStatistics> serviceStatistics = getServiceStatistics();
326 hostsStatistics = new HostsStatistics(serviceStatistics);
327 servicesStatistics = new ServicesStatistics(hostName, serviceStatistics);
328 jobsStatistics = new JobsStatistics(hostName);
329 jmxBeans.add(JmxUtil.registerMXBean(hostsStatistics, JMX_HOSTS_STATISTICS_TYPE));
330 jmxBeans.add(JmxUtil.registerMXBean(servicesStatistics, JMX_SERVICES_STATISTICS_TYPE));
331 jmxBeans.add(JmxUtil.registerMXBean(jobsStatistics, JMX_JOBS_STATISTICS_TYPE));
332 } catch (ServiceRegistryException e) {
333 logger.error("Error registering JMX statistic beans", e);
334 }
335
336
337 if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty("org.opencastproject.jobs.url"))) {
338 jobHost = hostName;
339 } else {
340 jobHost = cc.getBundleContext().getProperty("org.opencastproject.jobs.url");
341 }
342
343
344 try {
345 if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty(OpencastConstants.NODE_NAME_PROPERTY))) {
346 nodeName = null;
347 } else {
348 nodeName = cc.getBundleContext().getProperty(OpencastConstants.NODE_NAME_PROPERTY);
349 }
350
351 float maxLoad = Runtime.getRuntime().availableProcessors();
352 if (cc != null && StringUtils.isNotBlank(cc.getBundleContext().getProperty(OPT_MAXLOAD))) {
353 try {
354 maxLoad = Float.parseFloat(cc.getBundleContext().getProperty(OPT_MAXLOAD));
355 logger.info("Max load has been set manually to {}", maxLoad);
356 } catch (NumberFormatException e) {
357 logger.warn("Configuration key '{}' is not an integer. Falling back to the number of cores ({})",
358 OPT_MAXLOAD, maxLoad);
359 }
360 }
361
362 logger.info("Node maximum load set to {}", maxLoad);
363
364 String address = InetAddress.getByName(URI.create(hostName).getHost()).getHostAddress();
365 long maxMemory = Runtime.getRuntime().maxMemory();
366 int cores = Runtime.getRuntime().availableProcessors();
367
368 registerHost(hostName, address, nodeName, maxMemory, cores, maxLoad);
369 } catch (Exception e) {
370 throw new IllegalStateException("Unable to register host " + hostName + " in the service registry", e);
371 }
372
373
374 if (cc != null) {
375 acceptJobLoadsExeedingMaxLoad = getOptContextProperty(cc, ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY)
376 .map(Boolean::valueOf)
377 .orElse(DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING);
378 }
379
380 localSystemLoad = 0;
381 logger.info("Activated");
382 }
383
384 @Override
385 public float getOwnLoad() {
386 return localSystemLoad;
387 }
388
389 @Override
390 public String getRegistryHostname() {
391 return hostName;
392 }
393
394 @Deactivate
395 public void deactivate() {
396 logger.info("deactivate service registry");
397
398
399 if (scheduledExecutor != null) {
400 try {
401 scheduledExecutor.shutdownNow();
402 if (!scheduledExecutor.isShutdown()) {
403 logger.info("Waiting for Dispatcher to terminate");
404 scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS);
405 }
406 } catch (InterruptedException e) {
407 logger.error("Error shutting down the Dispatcher", e);
408 }
409 }
410
411 for (ObjectInstance mbean : jmxBeans) {
412 JmxUtil.unregisterMXBean(mbean);
413 }
414
415 try {
416 unregisterHost(hostName);
417 } catch (ServiceRegistryException e) {
418 throw new IllegalStateException("Unable to unregister host " + hostName + " from the service registry", e);
419 }
420 }
421
422
423
424
425
426
427 @Override
428 public Job createJob(String type, String operation) throws ServiceRegistryException {
429 return createJob(this.hostName, type, operation, null, null, true, getCurrentJob(), DEFAULT_JOB_LOAD);
430 }
431
432
433
434
435
436
437
438 @Override
439 public Job createJob(String type, String operation, List<String> arguments) throws ServiceRegistryException {
440 return createJob(this.hostName, type, operation, arguments, null, true, getCurrentJob(), DEFAULT_JOB_LOAD);
441 }
442
443
444
445
446
447
448
449 @Override
450 public Job createJob(String type, String operation, List<String> arguments, Float jobLoad)
451 throws ServiceRegistryException {
452 return createJob(this.hostName, type, operation, arguments, null, true, getCurrentJob(), jobLoad);
453 }
454
455
456
457
458
459
460
461 @Override
462 public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable)
463 throws ServiceRegistryException {
464 return createJob(this.hostName, type, operation, arguments, payload, dispatchable, getCurrentJob(),
465 DEFAULT_JOB_LOAD);
466 }
467
468
469
470
471
472
473
474 @Override
475 public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
476 Float jobLoad) throws ServiceRegistryException {
477 return createJob(this.hostName, type, operation, arguments, payload, dispatchable, getCurrentJob(), jobLoad);
478 }
479
480 public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
481 Job parentJob) throws ServiceRegistryException {
482 return createJob(this.hostName, type, operation, arguments, payload, dispatchable, parentJob, DEFAULT_JOB_LOAD);
483 }
484
485
486
487
488
489
490
491 @Override
492 public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
493 Job parentJob, Float jobLoad) throws ServiceRegistryException {
494 return createJob(this.hostName, type, operation, arguments, payload, dispatchable, parentJob, jobLoad);
495 }
496
497
498
499
500 public Job createJob(String host, String serviceType, String operation, List<String> arguments, String payload,
501 boolean dispatchable, Job parentJob) throws ServiceRegistryException {
502 return createJob(host, serviceType, operation, arguments, payload, dispatchable, parentJob, DEFAULT_JOB_LOAD);
503 }
504
505
506
507
508 public Job createJob(String host, String serviceType, String operation, List<String> arguments, String payload,
509 boolean dispatchable, Job parentJob, float jobLoad) throws ServiceRegistryException {
510 if (StringUtils.isBlank(host)) {
511 throw new IllegalArgumentException("Host can't be null");
512 }
513 if (StringUtils.isBlank(serviceType)) {
514 throw new IllegalArgumentException("Service type can't be null");
515 }
516 if (StringUtils.isBlank(operation)) {
517 throw new IllegalArgumentException("Operation can't be null");
518 }
519
520 JpaJob jpaJob = db.execTxChecked(em -> {
521 ServiceRegistrationJpaImpl creatingService = getServiceRegistrationQuery(serviceType, host).apply(em)
522 .orElseThrow(() -> new ServiceRegistryException("No service registration exists for type '" + serviceType
523 + "' on host '" + host + "'"));
524
525 if (creatingService.getHostRegistration().isMaintenanceMode()) {
526 logger.warn("Creating a job from {}, which is currently in maintenance mode.", creatingService.getHost());
527 } else if (!creatingService.getHostRegistration().isActive()) {
528 logger.warn("Creating a job from {}, which is currently inactive.", creatingService.getHost());
529 }
530
531 User currentUser = securityService.getUser();
532 Organization currentOrganization = securityService.getOrganization();
533
534 JpaJob job = new JpaJob(currentUser, currentOrganization, creatingService, operation, arguments, payload,
535 dispatchable, jobLoad);
536
537
538 if (parentJob != null) {
539
540 JpaJob jpaParentJob = getJpaJobQuery(parentJob.getId()).apply(em).orElseThrow(() -> {
541 logger.error("job with id {} not found in the persistence context", parentJob);
542
543 removeFromLoadCache(parentJob.getId());
544 return new ServiceRegistryException(new NotFoundException());
545 });
546 job.setParentJob(jpaParentJob);
547
548
549 JpaJob jpaRootJob = jpaParentJob;
550 if (parentJob.getRootJobId() != null) {
551 jpaRootJob = getJpaJobQuery(parentJob.getRootJobId()).apply(em).orElseThrow(() -> {
552 logger.error("job with id {} not found in the persistence context", parentJob.getRootJobId());
553
554 removeFromLoadCache(parentJob.getRootJobId());
555 return new ServiceRegistryException(new NotFoundException());
556 });
557 }
558 job.setRootJob(jpaRootJob);
559 }
560
561
562 if (dispatchable) {
563 logger.trace("Queuing dispatchable '{}'", job);
564 job.setStatus(Status.QUEUED);
565 } else {
566 logger.trace("Giving new non-dispatchable '{}' its creating service as processor '{}'", job, creatingService);
567 job.setProcessorServiceRegistration(creatingService);
568 }
569
570 em.persist(job);
571 return job;
572 });
573
574 setJobUri(jpaJob);
575 return jpaJob.toJob();
576 }
577
578 @Override
579 public void removeJobs(List<Long> jobIds) throws NotFoundException, ServiceRegistryException {
580 for (long jobId: jobIds) {
581 if (jobId < 1) {
582 throw new NotFoundException("Job ID must be greater than zero (0)");
583 }
584 }
585
586 logger.debug("Start deleting jobs with IDs '{}'", jobIds);
587 try {
588 db.execTxChecked(em -> {
589 for (long jobId : jobIds) {
590 JpaJob job = em.find(JpaJob.class, jobId);
591 if (job == null) {
592 logger.error("Job with Id {} cannot be deleted: Not found.", jobId);
593 removeFromLoadCache(jobId);
594 throw new NotFoundException("Job with ID '" + jobId + "' not found");
595 }
596 deleteChildJobsQuery(jobId).accept(em);
597 em.remove(job);
598 removeFromLoadCache(jobId);
599 }
600 });
601 } catch (NotFoundException | ServiceRegistryException e) {
602 throw e;
603 } catch (Exception e) {
604 throw new ServiceRegistryException(e);
605 }
606
607 logger.info("Jobs with IDs '{}' deleted", jobIds);
608 }
609
610 private ThrowingConsumer<EntityManager, Exception> deleteChildJobsQuery(long jobId) {
611 return em -> {
612 List<Job> childJobs = getChildJobs(jobId);
613 if (childJobs.isEmpty()) {
614 logger.trace("No child jobs of job '{}' found to delete.", jobId);
615 return;
616 }
617
618 logger.debug("Start deleting child jobs of job '{}'", jobId);
619
620 try {
621 for (int i = childJobs.size() - 1; i >= 0; i--) {
622 Job job = childJobs.get(i);
623 JpaJob jobToDelete = em.find(JpaJob.class, job.getId());
624 em.remove(jobToDelete);
625 removeFromLoadCache(job.getId());
626 logger.debug("{} deleted", job);
627 }
628 logger.debug("Deleted all child jobs of job '{}'", jobId);
629 } catch (Exception e) {
630 throw new ServiceRegistryException("Unable to remove child jobs from " + jobId, e);
631 }
632 };
633 }
634
635 @Override
636 public void removeParentlessJobs(int lifetime) throws ServiceRegistryException {
637 int count = db.execTxChecked(em -> {
638 int c = 0;
639
640 List<Job> jobs = namedQuery.findAll("Job.withoutParent", JpaJob.class).apply(em).stream()
641 .map(JpaJob::toJob)
642 .filter(j -> j.getDateCreated().before(DateUtils.addDays(new Date(), -lifetime)))
643
644 .filter(j -> !START_OPERATION.equals(j.getOperation())
645 && !START_WORKFLOW.equals(j.getOperation())
646 && !RESUME.equals(j.getOperation()))
647 .filter(j -> j.getStatus().isTerminated())
648 .collect(Collectors.toList());
649
650 for (Job job : jobs) {
651 try {
652 removeJobs(Collections.singletonList(job.getId()));
653 logger.debug("Parentless '{}' removed", job);
654 c++;
655 } catch (NotFoundException e) {
656 logger.debug("Parentless '{} ' not found in database", job, e);
657 }
658 }
659
660 return c;
661 });
662
663
664 if (count > 0) {
665 logger.info("Successfully removed {} parentless jobs", count);
666 } else {
667 logger.trace("No parentless jobs found to remove");
668 }
669 }
670
671
672
673
674
675
676 @Override
677 public void updated(Dictionary properties) throws ConfigurationException {
678 logger.info("Updating service registry properties");
679
680 maxAttemptsBeforeErrorState = DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE;
681 errorStatesEnabled = DEFAULT_ERROR_STATES_ENABLED;
682 String maxAttempts = StringUtils.trimToNull((String) properties.get(MAX_ATTEMPTS_CONFIG_KEY));
683 if (maxAttempts != null) {
684 try {
685 maxAttemptsBeforeErrorState = Integer.parseInt(maxAttempts);
686 if (maxAttemptsBeforeErrorState < 0) {
687 errorStatesEnabled = false;
688 logger.info("Error states of services disabled");
689 } else {
690 errorStatesEnabled = true;
691 logger.info("Set max attempts before error state to {}", maxAttempts);
692 }
693 } catch (NumberFormatException e) {
694 logger.warn("Can not set max attempts before error state to {}. {} must be an integer", maxAttempts,
695 MAX_ATTEMPTS_CONFIG_KEY);
696 }
697 }
698
699 noErrorStateServiceTypes = new ArrayList<>();
700 String noErrorStateServiceTypesStr = StringUtils.trimToNull((String) properties.get(
701 NO_ERROR_STATE_SERVICE_TYPES_CONFIG_KEY));
702 if (noErrorStateServiceTypesStr != null) {
703 noErrorStateServiceTypes = Arrays.asList(noErrorStateServiceTypesStr.split("\\s*,\\s*"));
704 if (!noErrorStateServiceTypes.isEmpty()) {
705 logger.info("Set service types without error state to {}", String.join(", ", noErrorStateServiceTypes));
706 }
707 }
708
709 long heartbeatInterval = DEFAULT_HEART_BEAT;
710 String heartbeatIntervalString = StringUtils.trimToNull((String) properties.get(OPT_HEARTBEATINTERVAL));
711 if (StringUtils.isNotBlank(heartbeatIntervalString)) {
712 try {
713 heartbeatInterval = Long.parseLong(heartbeatIntervalString);
714 } catch (Exception e) {
715 logger.warn("Heartbeat interval '{}' is malformed, setting to {}", heartbeatIntervalString, DEFAULT_HEART_BEAT);
716 heartbeatInterval = DEFAULT_HEART_BEAT;
717 }
718 if (heartbeatInterval == 0) {
719 logger.info("Heartbeat disabled");
720 } else if (heartbeatInterval < 0) {
721 logger.warn("Heartbeat interval {} seconds too low, adjusting to {}", heartbeatInterval, DEFAULT_HEART_BEAT);
722 heartbeatInterval = DEFAULT_HEART_BEAT;
723 } else {
724 logger.info("Heartbeat interval set to {} seconds", heartbeatInterval);
725 }
726 }
727
728 String jobStatsString = StringUtils.trimToNull((String) properties.get(OPT_JOBSTATISTICS));
729 if (StringUtils.isNotBlank(jobStatsString)) {
730 try {
731 collectJobstats = Boolean.parseBoolean(jobStatsString);
732 } catch (Exception e) {
733 logger.warn("Job statistics collection flag '{}' is malformed, setting to {}", jobStatsString,
734 DEFAULT_JOB_STATISTICS);
735 collectJobstats = DEFAULT_JOB_STATISTICS;
736 }
737 }
738
739
740 String encodingWorkersString = (String) properties.get(OPT_ENCODING_WORKERS);
741 if (StringUtils.isNotBlank(encodingWorkersString)) {
742 encodingWorkers = Arrays.asList(encodingWorkersString.split("\\s*,\\s*"));
743 } else {
744 encodingWorkers = DEFAULT_ENCODING_WORKERS;
745 }
746
747
748 String encodingThersholdString = StringUtils.trimToNull((String) properties.get(OPT_ENCODING_THRESHOLD));
749 if (StringUtils.isNotBlank(encodingThersholdString) && encodingThersholdString != null) {
750 try {
751 double encodingThresholdTmp = Double.parseDouble(encodingThersholdString);
752 if (encodingThresholdTmp >= 0 && encodingThresholdTmp <= 1) {
753 encodingThreshold = encodingThresholdTmp;
754 } else {
755 encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
756 logger.warn("org.opencastproject.encoding.workers.threshold is not between 0 and 1");
757 }
758 } catch (NumberFormatException e) {
759 logger.warn("Can not set encoding threshold to {}. {} must be an parsable double", encodingThersholdString,
760 OPT_ENCODING_THRESHOLD);
761 }
762 } else {
763 encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
764 }
765
766 String maxJobAgeString = StringUtils.trimToNull((String) properties.get(OPT_SERVICE_STATISTICS_MAX_JOB_AGE));
767 if (maxJobAgeString != null) {
768 try {
769 maxJobAge = Integer.parseInt(maxJobAgeString);
770 logger.info("Set service statistics max job age to {}", maxJobAgeString);
771 } catch (NumberFormatException e) {
772 logger.warn("Can not set service statistics max job age to {}. {} must be an integer", maxJobAgeString,
773 OPT_SERVICE_STATISTICS_MAX_JOB_AGE);
774 }
775 }
776
777 scheduledExecutor = Executors.newScheduledThreadPool(1);
778
779
780 if (heartbeatInterval > 0) {
781 logger.debug("Starting service heartbeat at a custom interval of {}s", heartbeatInterval);
782 scheduledExecutor.scheduleWithFixedDelay(new JobProducerHeartbeat(), heartbeatInterval, heartbeatInterval,
783 TimeUnit.SECONDS);
784 }
785 }
786
787
788
789
790
791
792 @Modified
793 public void modified(Map<String, Object> config) throws ConfigurationException {
794 logger.debug("Modified serviceregistry");
795 }
796
797 private Function<EntityManager, Optional<JpaJob>> getJpaJobQuery(long id) {
798 return em -> namedQuery.findByIdOpt(JpaJob.class, id)
799 .apply(em)
800 .map(jpaJob -> {
801
802
803 em.refresh(jpaJob);
804 setJobUri(jpaJob);
805 return jpaJob;
806 });
807 }
808
809 @Override
810 public Job getJob(long id) throws NotFoundException, ServiceRegistryException {
811 try {
812 return db.exec(getJpaJobQuery(id))
813 .map(JpaJob::toJob)
814 .orElseThrow(NotFoundException::new);
815 } catch (NotFoundException e) {
816 throw e;
817 } catch (Exception e) {
818 throw new ServiceRegistryException(e);
819 }
820 }
821
822
823
824
825
826
827 @Override
828 public Job getCurrentJob() {
829 return currentJob.get();
830 }
831
832
833
834
835
836
837 @Override
838 public void setCurrentJob(Job job) {
839 currentJob.set(job);
840 }
841
842 JpaJob updateJob(JpaJob job) throws ServiceRegistryException {
843 try {
844
845
846
847 return db.execChecked(em -> {
848 Job oldJob = getJob(job.getId());
849 JpaJob jpaJob = updateInternal(job);
850 if (!TYPE_WORKFLOW.equals(job.getJobType()) && job.getJobLoad() > 0.0f
851 && job.getProcessorServiceRegistration() != null
852 && job.getProcessorServiceRegistration().getHost().equals(getRegistryHostname())) {
853 processCachedLoadChange(job);
854 }
855
856
857 if (oldJob.getStatus() != job.getStatus() && !TYPE_WORKFLOW.equals(job.getJobType())) {
858 updateServiceForFailover(job);
859 }
860
861 return jpaJob;
862 });
863 } catch (ServiceRegistryException e) {
864 throw e;
865 } catch (NotFoundException e) {
866
867 removeFromLoadCache(job.getId());
868 throw new ServiceRegistryException(e);
869 } catch (Exception e) {
870 throw new ServiceRegistryException(e);
871 }
872 }
873
874 @Override
875 public Job updateJob(Job job) throws ServiceRegistryException {
876 JpaJob jpaJob = JpaJob.from(job);
877 jpaJob.setProcessorServiceRegistration(
878 (ServiceRegistrationJpaImpl) getServiceRegistration(job.getJobType(), job.getProcessingHost()));
879 return updateJob(jpaJob).toJob();
880 }
881
882
883
884
885
886
887
888 private synchronized void processCachedLoadChange(JpaJob job) {
889 if (JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(job.getStatus()) && jobCache.get(job.getId()) == null) {
890 logger.debug("Adding to load cache: {}, type {}, load {}, status {}",
891 job, job.getJobType(), job.getJobLoad(), job.getStatus());
892 localSystemLoad += job.getJobLoad();
893 jobCache.put(job.getId(), job.getJobLoad());
894 } else if (jobCache.get(job.getId()) != null && Status.FINISHED.equals(job.getStatus())
895 || Status.FAILED.equals(job.getStatus()) || Status.WAITING.equals(job.getStatus())) {
896 logger.debug("Removing from load cache: {}, type {}, load {}, status {}",
897 job, job.getJobType(), job.getJobLoad(), job.getStatus());
898 localSystemLoad -= job.getJobLoad();
899 jobCache.remove(job.getId());
900 } else {
901 logger.debug("Ignoring for load cache: {}, type {}, status {}",
902 job, job.getJobType(), job.getStatus());
903 }
904 logger.debug("Current host load: {}, job load cache size: {}", format("%.1f", localSystemLoad), jobCache.size());
905
906 if (jobCache.isEmpty()) {
907 if (Math.abs(localSystemLoad) > 0.0000001F) {
908 logger.warn("No jobs in the job load cache, but load is {}: setting job load to 0", localSystemLoad);
909 }
910 localSystemLoad = 0.0F;
911 }
912 }
913
914 private synchronized void removeFromLoadCache(Long jobId) {
915 if (jobCache.get(jobId) != null) {
916 float jobLoad = jobCache.get(jobId);
917 logger.debug("Removing deleted job from load cache: Job {}, load {}", jobId, jobLoad);
918 localSystemLoad -= jobLoad;
919 jobCache.remove(jobId);
920 }
921 }
922
923 protected JpaJob setJobUri(JpaJob job) {
924 try {
925 job.setUri(new URI(jobHost + "/services/job/" + job.getId() + ".xml"));
926 } catch (URISyntaxException e) {
927 logger.warn("Can not set the job URI", e);
928 }
929 return job;
930 }
931
932
933
934
935
936
937
938
939 protected JpaJob updateInternal(JpaJob job) throws NotFoundException {
940 JpaJob fromDb = db.execTxChecked(em -> {
941 JpaJob j = em.find(JpaJob.class, job.getId());
942 if (j == null) {
943 throw new NotFoundException();
944 }
945
946 update(j, job);
947 em.merge(j);
948 return j;
949 });
950
951 job.setVersion(fromDb.toJob().getVersion());
952 setJobUri(job);
953 return job;
954 }
955
956 public void updateStatisticsJobData() {
957 jobsStatistics.updateAvg(db.exec(getAvgOperationsQuery()));
958 jobsStatistics.updateJobCount(db.exec(getCountPerHostServiceQuery()));
959 }
960
961
962
963
964
965
966
967
968 private ServiceRegistration updateServiceState(ServiceRegistrationJpaImpl registration) throws NotFoundException {
969 db.execTxChecked(em -> {
970 ServiceRegistrationJpaImpl fromDb = em.find(ServiceRegistrationJpaImpl.class, registration.getId());
971 if (fromDb == null) {
972 throw new NotFoundException();
973 }
974 fromDb.setServiceState(registration.getServiceState());
975 fromDb.setStateChanged(registration.getStateChanged());
976 fromDb.setWarningStateTrigger(registration.getWarningStateTrigger());
977 fromDb.setErrorStateTrigger(registration.getErrorStateTrigger());
978 });
979
980 servicesStatistics.updateService(registration);
981 return registration;
982 }
983
984
985
986
987
988
989
990
991
992
993 private void update(JpaJob fromDb, JpaJob jpaJob) {
994 final Job job = jpaJob.toJob();
995 final Date now = new Date();
996 final Status status = job.getStatus();
997 final Status fromDbStatus = fromDb.getStatus();
998
999 fromDb.setPayload(job.getPayload());
1000 fromDb.setStatus(job.getStatus());
1001 fromDb.setDispatchable(job.isDispatchable());
1002 fromDb.setVersion(job.getVersion());
1003 fromDb.setOperation(job.getOperation());
1004 fromDb.setArguments(job.getArguments());
1005
1006 if (job.getDateCreated() == null) {
1007 jpaJob.setDateCreated(now);
1008 fromDb.setDateCreated(now);
1009 job.setDateCreated(now);
1010 }
1011 if (job.getProcessingHost() != null) {
1012 ServiceRegistrationJpaImpl processingService = (ServiceRegistrationJpaImpl) getServiceRegistration(
1013 job.getJobType(), job.getProcessingHost());
1014 logger.debug("{} has host '{}': setting processor service to '{}'", job, job.getProcessingHost(),
1015 processingService);
1016 fromDb.setProcessorServiceRegistration(processingService);
1017 } else {
1018 logger.debug("Unsetting previous processor service registration for {}", job);
1019 fromDb.setProcessorServiceRegistration(null);
1020 }
1021 if (Status.RUNNING.equals(status) && !Status.WAITING.equals(fromDbStatus)) {
1022 if (job.getDateStarted() == null) {
1023 jpaJob.setDateStarted(now);
1024 jpaJob.setQueueTime(now.getTime() - job.getDateCreated().getTime());
1025 fromDb.setDateStarted(now);
1026 fromDb.setQueueTime(now.getTime() - job.getDateCreated().getTime());
1027 job.setDateStarted(now);
1028 job.setQueueTime(now.getTime() - job.getDateCreated().getTime());
1029 }
1030 } else if (Status.FAILED.equals(status)) {
1031
1032 if (job.getDateCompleted() == null) {
1033 fromDb.setDateCompleted(now);
1034 jpaJob.setDateCompleted(now);
1035 job.setDateCompleted(now);
1036 if (job.getDateStarted() != null) {
1037 jpaJob.setRunTime(now.getTime() - job.getDateStarted().getTime());
1038 fromDb.setRunTime(now.getTime() - job.getDateStarted().getTime());
1039 job.setRunTime(now.getTime() - job.getDateStarted().getTime());
1040 }
1041 }
1042 } else if (Status.FINISHED.equals(status)) {
1043 if (job.getDateStarted() == null) {
1044
1045
1046 jpaJob.setDateStarted(job.getDateCreated());
1047 job.setDateStarted(job.getDateCreated());
1048 }
1049 if (job.getDateCompleted() == null) {
1050 jpaJob.setDateCompleted(now);
1051 jpaJob.setRunTime(now.getTime() - job.getDateStarted().getTime());
1052 fromDb.setDateCompleted(now);
1053 fromDb.setRunTime(now.getTime() - job.getDateStarted().getTime());
1054 job.setDateCompleted(now);
1055 job.setRunTime(now.getTime() - job.getDateStarted().getTime());
1056 }
1057 }
1058 }
1059
1060
1061
1062
1063
1064
1065
1066
1067 protected Function<EntityManager, Optional<HostRegistrationJpaImpl>> fetchHostRegistrationQuery(String host) {
1068 return namedQuery.findOpt(
1069 "HostRegistration.byHostName",
1070 HostRegistrationJpaImpl.class,
1071 Pair.of("host", host)
1072 );
1073 }
1074
1075
1076
1077
1078
1079
1080 @Override
1081 public void registerHost(String host, String address, String nodeName, long memory, int cores, float maxLoad)
1082 throws ServiceRegistryException {
1083 try {
1084 HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1085
1086 Optional<HostRegistrationJpaImpl> hostRegistrationOpt = fetchHostRegistrationQuery(host).apply(em);
1087 HostRegistrationJpaImpl hr;
1088
1089 if (hostRegistrationOpt.isEmpty()) {
1090 hr = new HostRegistrationJpaImpl(host, address, nodeName, memory, cores, maxLoad, true, false);
1091 em.persist(hr);
1092 } else {
1093 hr = hostRegistrationOpt.get();
1094 hr.setIpAddress(address);
1095 hr.setNodeName(nodeName);
1096 hr.setMemory(memory);
1097 hr.setCores(cores);
1098 hr.setMaxLoad(maxLoad);
1099 hr.setOnline(true);
1100 em.merge(hr);
1101 }
1102 logger.info("Registering {} with a maximum load of {}", host, maxLoad);
1103 return hr;
1104 });
1105
1106 hostsStatistics.updateHost(hostRegistration);
1107 } catch (Exception e) {
1108 throw new ServiceRegistryException(e);
1109 }
1110 }
1111
1112
1113
1114
1115
1116
1117 @Override
1118 public void unregisterHost(String host) throws ServiceRegistryException {
1119 try {
1120 HostRegistrationJpaImpl existingHostRegistration = db.execTxChecked(em -> {
1121 HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1122 () -> new IllegalArgumentException("Host '" + host + "' is not registered, so it can not be unregistered"));
1123
1124 hr.setOnline(false);
1125 for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1126 unRegisterService(serviceRegistration.getServiceType(), serviceRegistration.getHost());
1127 }
1128 em.merge(hr);
1129
1130 logger.info("Unregistering {}", host);
1131 return hr;
1132 });
1133
1134 logger.info("Host {} unregistered", host);
1135 hostsStatistics.updateHost(existingHostRegistration);
1136 } catch (Exception e) {
1137 throw new ServiceRegistryException(e);
1138 }
1139 }
1140
1141
1142
1143
1144
1145
1146 @Override
1147 public void enableHost(String host) throws ServiceRegistryException, NotFoundException {
1148 try {
1149 HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1150
1151 HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1152 () -> new NotFoundException("Host '" + host + "' is currently not registered, so it can not be enabled"));
1153 hr.setActive(true);
1154 em.merge(hr);
1155 logger.info("Enabling {}", host);
1156 return hr;
1157 });
1158
1159 db.execTxChecked(em -> {
1160 for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1161 ((ServiceRegistrationJpaImpl) serviceRegistration).setActive(true);
1162 em.merge(serviceRegistration);
1163 servicesStatistics.updateService(serviceRegistration);
1164 }
1165 });
1166
1167 hostsStatistics.updateHost(hostRegistration);
1168 } catch (NotFoundException e) {
1169 throw e;
1170 } catch (Exception e) {
1171 throw new ServiceRegistryException(e);
1172 }
1173 }
1174
1175
1176
1177
1178
1179
1180 @Override
1181 public void disableHost(String host) throws ServiceRegistryException, NotFoundException {
1182 try {
1183 HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1184 HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1185 () -> new NotFoundException("Host '" + host + "' is not currently registered, so it can not be disabled"));
1186
1187 hr.setActive(false);
1188 for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1189 ((ServiceRegistrationJpaImpl) serviceRegistration).setActive(false);
1190 em.merge(serviceRegistration);
1191 servicesStatistics.updateService(serviceRegistration);
1192 }
1193 em.merge(hr);
1194
1195 logger.info("Disabling {}", host);
1196 return hr;
1197 });
1198
1199 hostsStatistics.updateHost(hostRegistration);
1200 } catch (NotFoundException e) {
1201 throw e;
1202 } catch (Exception e) {
1203 throw new ServiceRegistryException(e);
1204 }
1205 }
1206
1207
1208
1209
1210
1211
1212
1213 @Override
1214 public ServiceRegistration registerService(String serviceType, String baseUrl, String path)
1215 throws ServiceRegistryException {
1216 return registerService(serviceType, baseUrl, path, false);
1217 }
1218
1219
1220
1221
1222
1223
1224
1225 @Override
1226 public ServiceRegistration registerService(String serviceType, String baseUrl, String path, boolean jobProducer)
1227 throws ServiceRegistryException {
1228 cleanRunningJobs(serviceType, baseUrl);
1229 return setOnlineStatus(serviceType, baseUrl, path, true, jobProducer);
1230 }
1231
1232 protected Function<EntityManager, Optional<ServiceRegistrationJpaImpl>> getServiceRegistrationQuery(
1233 String serviceType, String host) {
1234 return namedQuery.findOpt(
1235 "ServiceRegistration.getRegistration",
1236 ServiceRegistrationJpaImpl.class,
1237 Pair.of("serviceType", serviceType),
1238 Pair.of("host", host)
1239 );
1240 }
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255 protected ServiceRegistration setOnlineStatus(String serviceType, String baseUrl, String path, boolean online,
1256 Boolean jobProducer) throws ServiceRegistryException {
1257 if (isBlank(serviceType) || isBlank(baseUrl)) {
1258 logger.info("Uninformed baseUrl '{}' or service '{}' (path '{}')", baseUrl, serviceType, path);
1259 throw new IllegalArgumentException("serviceType and baseUrl must not be blank");
1260 }
1261
1262 try {
1263 AtomicReference<HostRegistrationJpaImpl> hostRegistration = new AtomicReference<>();
1264 AtomicReference<ServiceRegistrationJpaImpl> registration = new AtomicReference<>();
1265
1266 db.execTxChecked(em -> {
1267 HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(baseUrl).apply(em).orElseThrow(() -> {
1268 logger.info("No associated host registration for '{}' or service '{}' (path '{}')", baseUrl, serviceType,
1269 path);
1270 return new IllegalStateException(
1271 "A service registration can not be updated when it has no associated host registration");
1272 });
1273 hostRegistration.set(hr);
1274
1275 ServiceRegistrationJpaImpl sr;
1276 Optional<ServiceRegistrationJpaImpl> srOpt = getServiceRegistrationQuery(serviceType, baseUrl).apply(em);
1277 if (srOpt.isEmpty()) {
1278 if (isBlank(path)) {
1279
1280 throw new IllegalArgumentException("path must not be blank when registering new services");
1281 }
1282
1283
1284 sr = new ServiceRegistrationJpaImpl(hr, serviceType, path, Objects.requireNonNullElse(jobProducer, false));
1285 em.persist(sr);
1286 } else {
1287 sr = srOpt.get();
1288 if (StringUtils.isNotBlank(path)) {
1289 sr.setPath(path);
1290 }
1291 sr.setOnline(online);
1292 if (jobProducer != null) {
1293 sr.setJobProducer(jobProducer);
1294 }
1295 em.merge(sr);
1296 }
1297 registration.set(sr);
1298 });
1299
1300 hostsStatistics.updateHost(hostRegistration.get());
1301 servicesStatistics.updateService(registration.get());
1302 return registration.get();
1303 } catch (Exception e) {
1304 throw new ServiceRegistryException(e);
1305 }
1306 }
1307
1308
1309
1310
1311
1312
1313 @Override
1314 public void unRegisterService(String serviceType, String baseUrl) throws ServiceRegistryException {
1315 logger.info("Unregistering Service {}@{} and cleaning its running jobs", serviceType, baseUrl);
1316
1317
1318 setOnlineStatus(serviceType, baseUrl, null, false, null);
1319 cleanRunningJobs(serviceType, baseUrl);
1320 }
1321
1322
1323
1324
1325 private void cleanUndispatchableJobs(String hostName) {
1326 logger.debug("Starting check for undispatchable jobs for host {}", hostName);
1327
1328 try {
1329 db.execTxChecked(em -> {
1330 List<JpaJob> undispatchableJobs = namedQuery.findAll(
1331 "Job.undispatchable.status",
1332 JpaJob.class,
1333 Pair.of("statuses", List.of(
1334 Status.INSTANTIATED.ordinal(),
1335 Status.RUNNING.ordinal()
1336 ))
1337 ).apply(em);
1338
1339 if (undispatchableJobs.size() > 0) {
1340 logger.info("Found {} undispatchable jobs on host {}", undispatchableJobs.size(), hostName);
1341 }
1342
1343 for (JpaJob job : undispatchableJobs) {
1344
1345 String jobHost = "";
1346 if (job.getProcessorServiceRegistration() != null) {
1347 jobHost = job.getProcessorServiceRegistration().getHost();
1348 }
1349
1350 if (!jobHost.equals(hostName)) {
1351 logger.debug("Will not cancel undispatchable job {} for host {}, it is running on a different host ({})",
1352 job, hostName, jobHost);
1353 continue;
1354 }
1355
1356 logger.info("Cancelling the running undispatchable job {}, it was orphaned on this host ({})", job, hostName);
1357 job.setStatus(Status.CANCELLED);
1358 em.merge(job);
1359 }
1360 });
1361 } catch (Exception e) {
1362 logger.error("Unable to clean undispatchable jobs for host {}! {}", hostName, e.getMessage());
1363 }
1364 }
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376 private void cleanRunningJobs(String serviceType, String baseUrl) throws ServiceRegistryException {
1377 try {
1378 db.execTxChecked(em -> {
1379 TypedQuery<JpaJob> query = em.createNamedQuery("Job.processinghost.status", JpaJob.class)
1380 .setLockMode(LockModeType.PESSIMISTIC_WRITE)
1381 .setParameter("statuses", List.of(
1382 Status.RUNNING.ordinal(),
1383 Status.DISPATCHING.ordinal(),
1384 Status.WAITING.ordinal()
1385 ))
1386 .setParameter("host", baseUrl)
1387 .setParameter("serviceType", serviceType);
1388
1389 List<JpaJob> unregisteredJobs = query.getResultList();
1390 if (unregisteredJobs.size() > 0) {
1391 logger.info("Found {} jobs to clean for {}@{}", unregisteredJobs.size(), serviceType, baseUrl);
1392 }
1393
1394 for (JpaJob job : unregisteredJobs) {
1395 if (job.isDispatchable()) {
1396 em.refresh(job);
1397
1398 if (Status.CANCELLED.equals(job.getStatus()) || Status.RESTART.equals(job.getStatus())) {
1399 continue;
1400 }
1401
1402 if (job.getRootJob() != null && Status.PAUSED.equals(job.getRootJob().getStatus())) {
1403 JpaJob rootJob = job.getRootJob();
1404 cancelAllChildrenQuery(rootJob).accept(em);
1405 rootJob.setStatus(Status.RESTART);
1406 rootJob.setOperation(START_OPERATION);
1407 em.merge(rootJob);
1408 continue;
1409 }
1410
1411 logger.info("Marking child jobs from {} as canceled", job);
1412 cancelAllChildrenQuery(job).accept(em);
1413
1414 logger.info("Rescheduling lost {}", job);
1415 job.setStatus(Status.RESTART);
1416 job.setProcessorServiceRegistration(null);
1417 } else {
1418 logger.info("Marking lost {} as failed", job);
1419 job.setStatus(Status.FAILED);
1420 }
1421
1422 em.merge(job);
1423 }
1424 });
1425 } catch (Exception e) {
1426 throw new ServiceRegistryException(e);
1427 }
1428 }
1429
1430
1431
1432
1433
1434
1435
1436 private Consumer<EntityManager> cancelAllChildrenQuery(JpaJob job) {
1437 return em -> job.getChildJobs().stream()
1438 .peek(em::refresh)
1439 .filter(child -> Status.CANCELLED.equals(child.getStatus()))
1440 .forEach(child -> {
1441 cancelAllChildrenQuery(child).accept(em);
1442 child.setStatus(Status.CANCELLED);
1443 em.merge(child);
1444 });
1445 }
1446
1447
1448
1449
1450
1451
1452 @Override
1453 public void setMaintenanceStatus(String baseUrl, boolean maintenance) throws NotFoundException {
1454 logger.info("Setting maintenance mode on host '{}'", baseUrl);
1455 HostRegistrationJpaImpl reg = db.execTxChecked(em -> {
1456 HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(baseUrl).apply(em).orElseThrow(() -> {
1457 logger.warn("Can not set maintenance mode because host '{}' was not registered", baseUrl);
1458 return new NotFoundException("Can not set maintenance mode on a host that has not been registered");
1459 });
1460 hr.setMaintenanceMode(maintenance);
1461 em.merge(hr);
1462 return hr;
1463 });
1464
1465 hostsStatistics.updateHost(reg);
1466 logger.info("Finished setting maintenance mode on host '{}'", baseUrl);
1467 }
1468
1469
1470
1471
1472
1473
1474 @Override
1475 public List<ServiceRegistration> getServiceRegistrations() {
1476 return db.exec(getServiceRegistrationsQuery());
1477 }
1478
1479 @Override
1480 public Incidents incident() {
1481 return incidents;
1482 }
1483
1484 private List<ServiceRegistration> getOnlineServiceRegistrations() {
1485 return db.exec(namedQuery.findAll("ServiceRegistration.getAllOnline", ServiceRegistration.class));
1486 }
1487
1488
1489
1490
1491
1492
1493 protected Function<EntityManager, List<ServiceRegistration>> getServiceRegistrationsQuery() {
1494 return namedQuery.findAll("ServiceRegistration.getAll", ServiceRegistration.class);
1495 }
1496
1497
1498
1499
1500
1501
1502 @Override
1503 public List<HostRegistration> getHostRegistrations() {
1504 return db.exec(getHostRegistrationsQuery());
1505 }
1506
1507 @Override
1508 public HostStatistics getHostStatistics() {
1509 HostStatistics statistics = new HostStatistics();
1510
1511 db.exec(namedQuery.findAll(
1512 "HostRegistration.jobStatistics",
1513 Object[].class,
1514 Pair.of("status", List.of(Status.QUEUED.ordinal(), Status.RUNNING.ordinal()))
1515 )).forEach(row -> {
1516 final long host = ((Number) row[0]).longValue();
1517 final int status = ((Number) row[1]).intValue();
1518 final long count = ((Number) row[2]).longValue();
1519
1520 if (status == Status.RUNNING.ordinal()) {
1521 statistics.addRunning(host, count);
1522 } else {
1523 statistics.addQueued(host, count);
1524 }
1525 });
1526
1527 return statistics;
1528 }
1529
1530
1531
1532
1533
1534
1535 protected Function<EntityManager, List<HostRegistration>> getHostRegistrationsQuery() {
1536 return namedQuery.findAll("HostRegistration.getAll", HostRegistration.class);
1537 }
1538
1539 @Override
1540 public HostRegistration getHostRegistration(String hostname) throws ServiceRegistryException {
1541 return db.exec(getHostRegistrationQuery(hostname));
1542 }
1543
1544 protected Function<EntityManager, HostRegistration> getHostRegistrationQuery(String hostname) {
1545 return namedQuery.find(
1546 "HostRegistration.byHostName",
1547 HostRegistration.class,
1548 Pair.of("host", hostname)
1549 );
1550 }
1551
1552
1553
1554
1555
1556
1557 @Override
1558 public List<Job> getChildJobs(long id) throws ServiceRegistryException {
1559 try {
1560 List<JpaJob> jobs = db.exec(namedQuery.findAll(
1561 "Job.root.children",
1562 JpaJob.class,
1563 Pair.of("id", id)
1564 ));
1565
1566 if (jobs.size() == 0) {
1567 jobs = db.exec(getChildrenQuery(id));
1568 }
1569
1570 return jobs.stream()
1571 .map(this::setJobUri)
1572 .map(JpaJob::toJob)
1573 .collect(Collectors.toList());
1574 } catch (Exception e) {
1575 throw new ServiceRegistryException(e);
1576 }
1577 }
1578
1579 private Function<EntityManager, List<JpaJob>> getChildrenQuery(long id) {
1580 return em -> {
1581 TypedQuery<JpaJob> query = em
1582 .createNamedQuery("Job.children", JpaJob.class)
1583 .setParameter("id", id);
1584
1585 List<JpaJob> childJobs = query.getResultList();
1586
1587 List<JpaJob> result = new ArrayList<>(childJobs);
1588 childJobs.stream()
1589 .map(j -> getChildrenQuery(j.getId()).apply(em))
1590 .forEach(result::addAll);
1591
1592 return result;
1593 };
1594 }
1595
1596
1597
1598
1599
1600
1601 @Override
1602 public List<Job> getJobs(String type, Status status) throws ServiceRegistryException {
1603 logger.trace("Getting jobs '{}' and '{}'", type, status);
1604
1605 Function<EntityManager, List<JpaJob>> jobsQuery;
1606 if (type == null && status == null) {
1607 jobsQuery = namedQuery.findAll("Job.all", JpaJob.class);
1608 } else if (type == null) {
1609 jobsQuery = namedQuery.findAll(
1610 "Job.status",
1611 JpaJob.class,
1612 Pair.of("status", status.ordinal())
1613 );
1614 } else if (status == null) {
1615 jobsQuery = namedQuery.findAll(
1616 "Job.type",
1617 JpaJob.class,
1618 Pair.of("serviceType", type)
1619 );
1620 } else {
1621 jobsQuery = namedQuery.findAll(
1622 "Job",
1623 JpaJob.class,
1624 Pair.of("status", status.ordinal()),
1625 Pair.of("serviceType", type)
1626 );
1627 }
1628
1629 try {
1630 return db.exec(jobsQuery).stream()
1631 .peek(this::setJobUri)
1632 .map(JpaJob::toJob)
1633 .collect(Collectors.toList());
1634 } catch (Exception e) {
1635 throw new ServiceRegistryException(e);
1636 }
1637 }
1638
1639 @Override
1640 public List<String> getJobPayloads(String operation) throws ServiceRegistryException {
1641 try {
1642 return db.exec(namedQuery.findAll(
1643 "Job.payload",
1644 String.class,
1645 Pair.of("operation", operation)
1646 ));
1647 } catch (Exception e) {
1648 throw new ServiceRegistryException(e);
1649 }
1650 }
1651
1652 @Override
1653 public List<String> getJobPayloads(String operation, int limit, int offset) throws ServiceRegistryException {
1654 try {
1655 return db.exec(em -> {
1656 return em.createNamedQuery("Job.payload", String.class)
1657 .setParameter("operation", operation)
1658 .setMaxResults(limit)
1659 .setFirstResult(offset)
1660 .getResultList();
1661 });
1662 } catch (Exception e) {
1663 throw new ServiceRegistryException(e);
1664 }
1665 }
1666
1667 @Override
1668 public int getJobCount(final String operation) throws ServiceRegistryException {
1669 try {
1670 return db.exec(namedQuery.find(
1671 "Job.countByOperationOnly",
1672 Number.class,
1673 Pair.of("operation", operation)
1674 )).intValue();
1675 } catch (Exception e) {
1676 throw new ServiceRegistryException(e);
1677 }
1678 }
1679
1680
1681
1682
1683
1684
1685 @Override
1686 public List<Job> getActiveJobs() throws ServiceRegistryException {
1687 try {
1688 return db.exec(getJobsByStatusQuery(activeJobStatus)).stream()
1689 .map(JpaJob::toJob)
1690 .collect(Collectors.toList());
1691 } catch (Exception e) {
1692 throw new ServiceRegistryException(e);
1693 }
1694 }
1695
1696
1697
1698
1699
1700
1701
1702
1703 public Function<EntityManager, List<JpaJob>> getJobsByStatusQuery(Status... statuses) {
1704 if (statuses == null || statuses.length < 1) {
1705 throw new IllegalArgumentException("At least one job status must be given.");
1706 }
1707
1708 return namedQuery.findAll(
1709 "Job.statuses",
1710 JpaJob.class,
1711 Pair.of("statuses", Arrays.stream(statuses).map(Enum::ordinal).collect(Collectors.toList()))
1712 ).andThen(jobs -> jobs.stream()
1713 .peek(this::setJobUri)
1714 .collect(Collectors.toList()));
1715 }
1716
1717 @Override
1718 public Map<String, Map<String, Long>> countActiveByOrganizationAndHost() {
1719 var rows = db.exec(namedQuery.findAll(
1720 "Job.countByOrganizationAndHost",
1721 Object[].class,
1722 Pair.of("statuses", Arrays.stream(activeJobStatus).map(Enum::ordinal).collect(Collectors.toList()))
1723 )).stream().collect(Collectors.toList());
1724 var orgMap = new HashMap<String, Map<String, Long>>();
1725 for (Object[] row: rows) {
1726 var org = (String) row[0];
1727 var host = (String) row[1];
1728 var count = (Long) row[2];
1729 orgMap.computeIfAbsent(org, o -> new HashMap<>()).put(host, count);
1730 }
1731 return orgMap;
1732 }
1733
1734 @Override
1735 public Map<String, Long> countActiveTypeByOrganization(final String operation) {
1736 return db.exec(namedQuery.findAll(
1737 "Job.countTypeByOrganization",
1738 Object[].class,
1739 Pair.of("statuses", Arrays.stream(activeJobStatus).map(Enum::ordinal).collect(Collectors.toList())),
1740 Pair.of("operation", operation)
1741 )).stream().collect(Collectors.toMap(
1742 row -> (String) row[0],
1743 row -> (Long) row[1]
1744 ));
1745 }
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755 protected Function<EntityManager, List<JpaJob>> getDispatchableJobsWithStatusQuery(int offset, int limit,
1756 Status... statuses) {
1757 return em -> {
1758 if (statuses == null) {
1759 return Collections.emptyList();
1760 }
1761
1762 TypedQuery<JpaJob> query = em
1763 .createNamedQuery("Job.dispatchable.status", JpaJob.class)
1764 .setParameter("statuses", Arrays.stream(statuses).map(Enum::ordinal).collect(Collectors.toList()));
1765 if (offset > 0) {
1766 query.setFirstResult(offset);
1767 }
1768 if (limit > 0) {
1769 query.setMaxResults(limit);
1770 }
1771 return query.getResultList();
1772 };
1773 }
1774
1775 Function<EntityManager, List<Object[]>> getAvgOperationsQuery() {
1776 return namedQuery.findAll("Job.avgOperation", Object[].class);
1777 }
1778
1779 Function<EntityManager, List<Object[]>> getCountPerHostServiceQuery() {
1780 return namedQuery.findAll("Job.countPerHostService", Object[].class);
1781 }
1782
1783
1784
1785
1786
1787
1788 @Override
1789 public long count(String serviceType, Status status) throws ServiceRegistryException {
1790 Function<EntityManager, Number> countQuery;
1791 if (serviceType == null && status == null) {
1792 countQuery = namedQuery.find(
1793 "Job.count.all",
1794 Number.class
1795 );
1796 } else if (serviceType == null) {
1797 countQuery = namedQuery.find(
1798 "Job.count.nullType",
1799 Number.class,
1800 Pair.of("status", status.ordinal())
1801 );
1802 } else if (status == null) {
1803 countQuery = namedQuery.find(
1804 "Job.count.nullStatus",
1805 Number.class,
1806 Pair.of("serviceType", serviceType)
1807 );
1808 } else {
1809 countQuery = namedQuery.find(
1810 "Job.count",
1811 Number.class,
1812 Pair.of("status", status.ordinal()),
1813 Pair.of("serviceType", serviceType)
1814 );
1815 }
1816
1817 try {
1818 return db.exec(countQuery).longValue();
1819 } catch (Exception e) {
1820 throw new ServiceRegistryException(e);
1821 }
1822 }
1823
1824
1825
1826
1827
1828
1829
1830 @Override
1831 public long countByHost(String serviceType, String host, Status status) throws ServiceRegistryException {
1832 Function<EntityManager, Number> countQuery;
1833 if (serviceType != null && !serviceType.isEmpty()) {
1834 countQuery = namedQuery.find(
1835 "Job.countByHost",
1836 Number.class,
1837 Pair.of("serviceType", serviceType),
1838 Pair.of("status", status.ordinal()),
1839 Pair.of("host", host)
1840 );
1841 } else {
1842 countQuery = namedQuery.find(
1843 "Job.countByHost.nullType",
1844 Number.class,
1845 Pair.of("status", status.ordinal()),
1846 Pair.of("host", host)
1847 );
1848 }
1849
1850 try {
1851 return db.exec(countQuery).longValue();
1852 } catch (Exception e) {
1853 throw new ServiceRegistryException(e);
1854 }
1855 }
1856
1857
1858
1859
1860
1861
1862
1863
1864 @Override
1865 public long countByOperation(String serviceType, String operation, Status status) throws ServiceRegistryException {
1866 try {
1867 return db.exec(namedQuery.find(
1868 "Job.countByOperation",
1869 Number.class,
1870 Pair.of("status", status.ordinal()),
1871 Pair.of("serviceType", serviceType),
1872 Pair.of("operation", operation)
1873 )).longValue();
1874 } catch (Exception e) {
1875 throw new ServiceRegistryException(e);
1876 }
1877 }
1878
1879
1880
1881
1882
1883
1884
1885 @Override
1886 public long count(String serviceType, String host, String operation, Status status) throws ServiceRegistryException {
1887 if (StringUtils.isBlank(serviceType) || StringUtils.isBlank(host) || StringUtils.isBlank(operation)
1888 || status == null) {
1889 throw new IllegalArgumentException("service type, host, operation, and status must be provided");
1890 }
1891
1892 try {
1893 return db.exec(namedQuery.find(
1894 "Job.fullMonty",
1895 Number.class,
1896 Pair.of("status", status.ordinal()),
1897 Pair.of("serviceType", serviceType),
1898 Pair.of("operation", operation)
1899 )).longValue();
1900 } catch (Exception e) {
1901 throw new ServiceRegistryException(e);
1902 }
1903 }
1904
1905
1906
1907
1908
1909
1910 @Override
1911 public List<ServiceStatistics> getServiceStatistics() throws ServiceRegistryException {
1912 Date now = new Date();
1913 try {
1914 return db.exec(getServiceStatisticsQuery(
1915 DateUtils.addDays(now, -maxJobAge),
1916 DateUtils.addDays(now, 1)
1917 ));
1918 } catch (Exception e) {
1919 throw new ServiceRegistryException(e);
1920 }
1921 }
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933 private Function<EntityManager, List<ServiceStatistics>> getServiceStatisticsQuery(Date startDate, Date endDate) {
1934 return em -> {
1935 Map<Long, JaxbServiceStatistics> statsMap = new HashMap<>();
1936
1937
1938 namedQuery.findAll("ServiceRegistration.getAll", ServiceRegistrationJpaImpl.class).apply(em).forEach(s ->
1939 statsMap.put(s.getId(), new JaxbServiceStatistics(s))
1940 );
1941
1942 if (collectJobstats) {
1943
1944 namedQuery.findAll(
1945 "ServiceRegistration.statistics",
1946 Object[].class,
1947 Pair.of("minDateCreated", startDate),
1948 Pair.of("maxDateCreated", endDate)
1949 ).apply(em).forEach(row -> {
1950 Number serviceRegistrationId = (Number) row[0];
1951 if (serviceRegistrationId == null || serviceRegistrationId.longValue() == 0) {
1952 return;
1953 }
1954 Status status = Status.values()[((Number) row[1]).intValue()];
1955 Number count = (Number) row[2];
1956 Number meanQueueTime = (Number) row[3];
1957 Number meanRunTime = (Number) row[4];
1958
1959
1960 JaxbServiceStatistics stats = statsMap.get(serviceRegistrationId.longValue());
1961 if (stats == null) {
1962 return;
1963 }
1964
1965
1966 if (status != null) {
1967 switch (status) {
1968 case RUNNING:
1969 stats.setRunningJobs(count.intValue());
1970 break;
1971 case QUEUED:
1972 case DISPATCHING:
1973 stats.setQueuedJobs(count.intValue());
1974 break;
1975 case FINISHED:
1976 stats.setMeanRunTime(meanRunTime.longValue());
1977 stats.setMeanQueueTime(meanQueueTime.longValue());
1978 stats.setFinishedJobs(count.intValue());
1979 break;
1980 default:
1981 break;
1982 }
1983 }
1984 });
1985 }
1986
1987 List<ServiceStatistics> stats = new ArrayList<>(statsMap.values());
1988 stats.sort((o1, o2) -> {
1989 ServiceRegistration reg1 = o1.getServiceRegistration();
1990 ServiceRegistration reg2 = o2.getServiceRegistration();
1991 int typeComparison = reg1.getServiceType().compareTo(reg2.getServiceType());
1992 return typeComparison == 0
1993 ? reg1.getHost().compareTo(reg2.getHost())
1994 : typeComparison;
1995 });
1996
1997 return stats;
1998 };
1999 }
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009 @Override
2010 public List<ServiceRegistration> getServiceRegistrationsByLoad(String serviceType) throws ServiceRegistryException {
2011 SystemLoad loadByHost = getCurrentHostLoads();
2012 List<HostRegistration> hostRegistrations = getHostRegistrations();
2013 List<ServiceRegistration> serviceRegistrations = getServiceRegistrationsByType(serviceType);
2014 return getServiceRegistrationsByLoad(serviceType, serviceRegistrations, hostRegistrations, loadByHost);
2015 }
2016
2017
2018
2019
2020
2021
2022 @Override
2023 public SystemLoad getCurrentHostLoads() {
2024 return db.exec(getHostLoadsQuery());
2025 }
2026
2027
2028
2029
2030
2031
2032 Function<EntityManager, SystemLoad> getHostLoadsQuery() {
2033 return em -> {
2034 final SystemLoad systemLoad = new SystemLoad();
2035
2036
2037 List<Integer> statuses = JOB_STATUSES_INFLUENCING_LOAD_BALANCING.stream()
2038 .map(Enum::ordinal)
2039 .collect(Collectors.toList());
2040 List<Object[]> rows = namedQuery.findAll(
2041 "ServiceRegistration.hostloads",
2042 Object[].class,
2043 Pair.of("statuses", statuses),
2044
2045
2046 Pair.of("workflow_type", TYPE_WORKFLOW)
2047 ).apply(em);
2048
2049
2050 for (Object[] row : rows) {
2051 String host = String.valueOf(row[0]);
2052 Status status = Status.values()[(int) row[1]];
2053 float currentLoad = ((Number) row[2]).floatValue();
2054 float maxLoad = ((Number) row[3]).floatValue();
2055
2056
2057 if (status == null || !JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(status)) {
2058 currentLoad = 0.0f;
2059 }
2060
2061 NodeLoad serviceLoad = new NodeLoad(host, currentLoad, maxLoad);
2062 systemLoad.addNodeLoad(serviceLoad);
2063 }
2064
2065
2066 getHostRegistrationsQuery().apply(em).stream()
2067 .filter(h -> !systemLoad.containsHost(h.getBaseUrl()))
2068 .forEach(h -> systemLoad.addNodeLoad(new NodeLoad(h.getBaseUrl(), 0.0f, h.getMaxLoad())));
2069 return systemLoad;
2070 };
2071 }
2072
2073
2074
2075
2076
2077
2078 @Override
2079 public List<ServiceRegistration> getServiceRegistrationsByType(String serviceType) throws ServiceRegistryException {
2080 return db.exec(namedQuery.findAll(
2081 "ServiceRegistration.getByType",
2082 ServiceRegistration.class,
2083 Pair.of("serviceType", serviceType)
2084 ));
2085 }
2086
2087
2088
2089
2090
2091
2092 @Override
2093 public List<ServiceRegistration> getServiceRegistrationsByHost(String host) throws ServiceRegistryException {
2094 return db.exec(getServiceRegistrationsByHostQuery(host));
2095 }
2096
2097 private Function<EntityManager, List<ServiceRegistration>> getServiceRegistrationsByHostQuery(String host) {
2098 return namedQuery.findAll(
2099 "ServiceRegistration.getByHost",
2100 ServiceRegistration.class,
2101 Pair.of("host", host)
2102 );
2103 }
2104
2105
2106
2107
2108
2109
2110
2111 @Override
2112 public ServiceRegistration getServiceRegistration(String serviceType, String host) {
2113 return db.exec(getServiceRegistrationQuery(serviceType, host))
2114 .orElse(null);
2115 }
2116
2117
2118
2119
2120
2121
2122
2123 @Reference
2124 void setTrustedHttpClient(TrustedHttpClient client) {
2125 this.client = client;
2126 }
2127
2128
2129
2130
2131
2132
2133
2134 @Reference
2135 public void setSecurityService(SecurityService securityService) {
2136 this.securityService = securityService;
2137 }
2138
2139
2140 @Reference(
2141 cardinality = ReferenceCardinality.OPTIONAL,
2142 policy = ReferencePolicy.DYNAMIC,
2143 unbind = "unsetIncidentService"
2144 )
2145 public void setIncidentService(IncidentService incidentService) {
2146 this.incidentService = incidentService;
2147
2148 ((OsgiIncidentService) incidentService).setServiceRegistry(this);
2149 this.incidents = new Incidents(this, incidentService);
2150 }
2151
2152 public void unsetIncidentService(IncidentService incidentService) {
2153 if (this.incidentService == incidentService) {
2154 this.incidentService = null;
2155 this.incidents = null;
2156 }
2157 }
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168 private void updateServiceForFailover(JpaJob job) throws ServiceRegistryException, NotFoundException {
2169 if (job.getStatus() != Status.FAILED && job.getStatus() != Status.FINISHED) {
2170 return;
2171 }
2172
2173 job.setStatus(job.getStatus(), job.getFailureReason());
2174
2175
2176
2177 ServiceRegistrationJpaImpl currentService = job.getProcessorServiceRegistration();
2178 if (currentService == null) {
2179 return;
2180 }
2181
2182
2183 if (job.getStatus() == FAILED && !DATA.equals(job.getFailureReason())) {
2184
2185
2186 List<ServiceRegistrationJpaImpl> relatedWarningOrErrorServices = getRelatedWarningErrorServices(job);
2187
2188
2189 if (relatedWarningOrErrorServices.size() > 0) {
2190 for (ServiceRegistrationJpaImpl relatedService : relatedWarningOrErrorServices) {
2191
2192 if (currentService.equals(relatedService)) {
2193 continue;
2194 }
2195
2196
2197
2198 if (relatedService.getServiceState() == WARNING) {
2199 logger.info("State reset to NORMAL for related service {} on host {}", relatedService.getServiceType(),
2200 relatedService.getHost());
2201 relatedService.setServiceState(NORMAL, job.toJob().getSignature());
2202 }
2203
2204
2205 else if (relatedService.getServiceState() == ERROR) {
2206 logger.info("State reset to WARNING for related service {} on host {}", relatedService.getServiceType(),
2207 relatedService.getHost());
2208 relatedService.setServiceState(WARNING, relatedService.getWarningStateTrigger());
2209 }
2210
2211 updateServiceState(relatedService);
2212 }
2213 }
2214
2215
2216 else {
2217
2218 if (currentService.getServiceState() == NORMAL) {
2219 logger.info("State set to WARNING for current service {} on host {}", currentService.getServiceType(),
2220 currentService.getHost());
2221 currentService.setServiceState(WARNING, job.toJob().getSignature());
2222 updateServiceState(currentService);
2223 }
2224
2225
2226 else if (errorStatesEnabled && !noErrorStateServiceTypes.contains(currentService.getServiceType())
2227 && getHistorySize(currentService) >= maxAttemptsBeforeErrorState) {
2228 logger.info("State set to ERROR for current service {} on host {}", currentService.getServiceType(),
2229 currentService.getHost());
2230 currentService.setServiceState(ERROR, job.toJob().getSignature());
2231 updateServiceState(currentService);
2232 }
2233 }
2234 }
2235
2236
2237 else if (job.getStatus() == Status.FINISHED) {
2238
2239 if (currentService.getServiceState() == WARNING) {
2240 logger.info("State reset to NORMAL for current service {} on host {}", currentService.getServiceType(),
2241 currentService.getHost());
2242 currentService.setServiceState(NORMAL);
2243 updateServiceState(currentService);
2244 }
2245 }
2246 }
2247
2248
2249
2250
2251
2252
2253 @Override
2254 public void sanitize(String serviceType, String host) throws NotFoundException {
2255 db.execChecked(em -> {
2256 ServiceRegistrationJpaImpl service = getServiceRegistrationQuery(serviceType, host).apply(em)
2257 .orElseThrow(NotFoundException::new);
2258
2259 logger.info("State reset to NORMAL for service {} on host {} through sanitize method", service.getServiceType(),
2260 service.getHost());
2261 service.setServiceState(NORMAL);
2262 updateServiceState(service);
2263 });
2264 }
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275 private int getHistorySize(ServiceRegistration serviceRegistration) throws ServiceRegistryException {
2276 if (serviceRegistration == null) {
2277 throw new IllegalArgumentException("serviceRegistration must not be null!");
2278 }
2279
2280 logger.debug("Calculating count of jobs who failed due to service {}", serviceRegistration);
2281
2282 try {
2283 return db.exec(namedQuery.find(
2284 "Job.count.history.failed",
2285 Number.class,
2286 Pair.of("serviceType", serviceRegistration.getServiceType()),
2287 Pair.of("host", serviceRegistration.getHost())
2288 )).intValue();
2289 } catch (Exception e) {
2290 throw new ServiceRegistryException(e);
2291 }
2292 }
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305 private List<ServiceRegistrationJpaImpl> getRelatedWarningErrorServices(JpaJob job) throws ServiceRegistryException {
2306 if (job == null) {
2307 throw new IllegalArgumentException("job must not be null!");
2308 }
2309
2310 logger.debug("Try to get the services in WARNING or ERROR state triggered by {} failed", job);
2311
2312 try {
2313 return db.exec(namedQuery.findAll(
2314 "ServiceRegistration.relatedservices.warning_error",
2315 ServiceRegistrationJpaImpl.class,
2316 Pair.of("serviceType", job.getJobType())
2317 )).stream()
2318
2319 .filter(rs ->
2320 (rs.getServiceState() == WARNING && rs.getWarningStateTrigger() == job.toJob().getSignature())
2321 || (rs.getServiceState() == ERROR && rs.getErrorStateTrigger() == job.toJob().getSignature())
2322 ).collect(Collectors.toList());
2323 } catch (Exception e) {
2324 throw new ServiceRegistryException(e);
2325 }
2326 }
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341 protected List<ServiceRegistration> getServiceRegistrationsWithCapacity(String jobType,
2342 List<ServiceRegistration> serviceRegistrations, List<HostRegistration> hostRegistrations,
2343 final SystemLoad systemLoad) {
2344 final List<String> hostBaseUrls = hostRegistrations.stream()
2345 .map(HostRegistration::getBaseUrl)
2346 .collect(Collectors.toUnmodifiableList());
2347 final List<ServiceRegistration> filteredList = new ArrayList<>();
2348
2349 for (ServiceRegistration service : serviceRegistrations) {
2350
2351 if (!hostBaseUrls.contains(service.getHost())) {
2352 logger.trace("Not considering {} because it's host {} is not available for dispatching", service,
2353 service.getHost());
2354 continue;
2355 }
2356
2357
2358 if (!jobType.equals(service.getServiceType())) {
2359 logger.trace("Not considering {} because it is of the wrong job type", service);
2360 continue;
2361 }
2362
2363
2364 if (service.getServiceState() == ERROR) {
2365 logger.trace("Not considering {} because it is in error state", service);
2366 continue;
2367 }
2368
2369
2370 if (service.isInMaintenanceMode()) {
2371 logger.trace("Not considering {} because it is in maintenance mode", service);
2372 continue;
2373 }
2374
2375
2376 if (!service.isOnline()) {
2377 logger.trace("Not considering {} because it is currently offline", service);
2378 continue;
2379 }
2380
2381
2382 Float hostLoadMax = null;
2383 for (HostRegistration host : hostRegistrations) {
2384 if (host.getBaseUrl().equals(service.getHost())) {
2385 hostLoadMax = host.getMaxLoad();
2386 break;
2387 }
2388 }
2389 if (hostLoadMax == null) {
2390 logger.warn("Unable to determine max load for host {}", service.getHost());
2391 }
2392
2393
2394 Float hostLoad = systemLoad.get(service.getHost()).getLoadFactor();
2395 if (hostLoad == null) {
2396 logger.warn("Unable to determine current load for host {}", service.getHost());
2397 }
2398
2399
2400 if (hostLoad == null || hostLoadMax == null || hostLoad < hostLoadMax) {
2401 logger.debug("Adding candidate service {} for processing of jobs of type '{}' (host load is {} of max {})",
2402 service, jobType, hostLoad, hostLoadMax);
2403 filteredList.add(service);
2404 }
2405 }
2406
2407
2408 filteredList.sort(new LoadComparator(systemLoad));
2409
2410 return filteredList;
2411 }
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426 protected List<ServiceRegistration> getServiceRegistrationsByLoad(String jobType,
2427 List<ServiceRegistration> serviceRegistrations, List<HostRegistration> hostRegistrations,
2428 final SystemLoad systemLoad) {
2429 final List<String> hostBaseUrls = hostRegistrations.stream()
2430 .map(HostRegistration::getBaseUrl)
2431 .collect(Collectors.toUnmodifiableList());
2432 final List<ServiceRegistration> filteredList = new ArrayList<>();
2433
2434 logger.debug("Finding services to dispatch job of type {}", jobType);
2435
2436 for (ServiceRegistration service : serviceRegistrations) {
2437
2438 if (!hostBaseUrls.contains(service.getHost())) {
2439 logger.trace("Not considering {} because it's host {} is not available for dispatching", service,
2440 service.getHost());
2441 continue;
2442 }
2443
2444
2445 if (!jobType.equals(service.getServiceType())) {
2446 logger.trace("Not considering {} because it is of the wrong job type", service);
2447 continue;
2448 }
2449
2450
2451 if (service.getServiceState() == ERROR) {
2452 logger.trace("Not considering {} because it is in error state", service);
2453 continue;
2454 }
2455
2456
2457 if (service.isInMaintenanceMode()) {
2458 logger.trace("Not considering {} because it is in maintenance mode", service);
2459 continue;
2460 }
2461
2462
2463 if (!service.isOnline()) {
2464 logger.trace("Not considering {} because it is currently offline", service);
2465 continue;
2466 }
2467
2468
2469 logger.debug("Adding candidate service {} for processing of job of type '{}'", service, jobType);
2470 filteredList.add(service);
2471 }
2472
2473
2474 if ("org.opencastproject.composer".equals(jobType)) {
2475 Collections.sort(filteredList, new LoadComparatorEncoding(systemLoad));
2476 } else {
2477 Collections.sort(filteredList, new LoadComparator(systemLoad));
2478 }
2479
2480 return filteredList;
2481 }
2482
2483
2484
2485
2486
2487
2488 @Override
2489 public SystemLoad getMaxLoads() throws ServiceRegistryException {
2490 final SystemLoad loads = new SystemLoad();
2491 getHostRegistrations().stream()
2492 .map(host -> new NodeLoad(host.getBaseUrl(), 0.0f, host.getMaxLoad()))
2493 .forEach(loads::addNodeLoad);
2494 return loads;
2495 }
2496
2497
2498
2499
2500
2501
2502 @Override
2503 public NodeLoad getMaxLoadOnNode(String host) throws ServiceRegistryException, NotFoundException {
2504 try {
2505 float maxLoad = db.exec(namedQuery.find(
2506 "HostRegistration.getMaxLoadByHostName",
2507 Number.class,
2508 Pair.of("host", host)
2509 )).floatValue();
2510 return new NodeLoad(host, 0.0f, maxLoad);
2511 } catch (NoResultException e) {
2512 throw new NotFoundException(e);
2513 } catch (Exception e) {
2514 throw new ServiceRegistryException(e);
2515 }
2516 }
2517
2518
2519 class JobProducerHeartbeat implements Runnable {
2520
2521
2522 private final List<ServiceRegistration> unresponsive = new ArrayList<>();
2523
2524
2525
2526
2527
2528
2529 @Override
2530 public void run() {
2531 logger.debug("Checking for unresponsive services");
2532
2533 try {
2534 List<ServiceRegistration> serviceRegistrations = getOnlineServiceRegistrations();
2535
2536 for (ServiceRegistration service : serviceRegistrations) {
2537 hostsStatistics.updateHost(((ServiceRegistrationJpaImpl) service).getHostRegistration());
2538 servicesStatistics.updateService(service);
2539 if (!service.isJobProducer()) {
2540 continue;
2541 }
2542 if (service.isInMaintenanceMode()) {
2543 continue;
2544 }
2545
2546
2547 String serviceUrl = UrlSupport.concat(service.getHost(), service.getPath(), "dispatch");
2548
2549 HttpHead options = new HttpHead(serviceUrl);
2550 HttpResponse response = null;
2551 try {
2552 try {
2553 response = client.execute(options);
2554 if (response != null) {
2555 switch (response.getStatusLine().getStatusCode()) {
2556 case HttpStatus.SC_OK:
2557
2558 logger.trace("Service " + service + " is responsive: " + response.getStatusLine());
2559 if (unresponsive.remove(service)) {
2560 logger.info("Service {} is still online", service);
2561 } else if (!service.isOnline()) {
2562 try {
2563 setOnlineStatus(service.getServiceType(), service.getHost(), service.getPath(), true, true);
2564 logger.info("Service {} is back online", service);
2565 } catch (ServiceRegistryException e) {
2566 logger.warn("Error setting online status for {}", service);
2567 }
2568 }
2569 continue;
2570 default:
2571 if (!service.isOnline()) {
2572 continue;
2573 }
2574 logger.warn("Service {} is not working as expected: {}", service, response.getStatusLine());
2575 }
2576 } else {
2577 logger.warn("Service {} does not respond", service);
2578 }
2579 } catch (TrustedHttpClientException e) {
2580 if (!service.isOnline()) {
2581 continue;
2582 }
2583 logger.warn("Unable to reach {}", service, e);
2584 }
2585
2586
2587 try {
2588 if (unresponsive.contains(service)) {
2589 unRegisterService(service.getServiceType(), service.getHost());
2590 unresponsive.remove(service);
2591 logger.warn("Marking {} as offline", service);
2592 } else {
2593 unresponsive.add(service);
2594 logger.warn("Added {} to the watch list", service);
2595 }
2596 } catch (ServiceRegistryException e) {
2597 logger.warn("Unable to unregister unreachable service: {}", service, e);
2598 }
2599 } finally {
2600 client.close(response);
2601 }
2602 }
2603 } catch (Throwable t) {
2604 logger.warn("Error while checking for unresponsive services", t);
2605 }
2606
2607 logger.debug("Finished checking for unresponsive services");
2608 }
2609 }
2610
2611
2612
2613
2614
2615
2616 private class LoadComparator implements Comparator<ServiceRegistration> {
2617
2618 protected SystemLoad loadByHost = null;
2619
2620
2621
2622
2623
2624
2625
2626 LoadComparator(SystemLoad loadByHost) {
2627 this.loadByHost = loadByHost;
2628 }
2629
2630 @Override
2631 public int compare(ServiceRegistration serviceA, ServiceRegistration serviceB) {
2632 String hostA = serviceA.getHost();
2633 String hostB = serviceB.getHost();
2634 NodeLoad nodeA = loadByHost.get(hostA);
2635 NodeLoad nodeB = loadByHost.get(hostB);
2636
2637 if (Math.abs(nodeA.getLoadFactor() - nodeB.getLoadFactor()) <= 0.01) {
2638
2639
2640
2641 return Float.compare(nodeB.getMaxLoad(), nodeA.getMaxLoad());
2642 }
2643 return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2644 }
2645 }
2646
2647
2648
2649
2650
2651
2652
2653
2654 private class LoadComparatorEncoding extends LoadComparator implements Comparator<ServiceRegistration> {
2655
2656
2657
2658
2659
2660
2661 LoadComparatorEncoding(SystemLoad loadByHost) {
2662 super(loadByHost);
2663 }
2664
2665 @Override
2666 public int compare(ServiceRegistration serviceA, ServiceRegistration serviceB) {
2667 String hostA = serviceA.getHost();
2668 String hostB = serviceB.getHost();
2669 NodeLoad nodeA = loadByHost.get(hostA);
2670 NodeLoad nodeB = loadByHost.get(hostB);
2671
2672 if (encodingWorkers != null) {
2673 if (encodingWorkers.contains(hostA) && !encodingWorkers.contains(hostB)) {
2674 if (nodeA.getLoadFactor() <= encodingThreshold) {
2675 return -1;
2676 }
2677 return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2678 }
2679 if (encodingWorkers.contains(hostB) && !encodingWorkers.contains(hostA)) {
2680 if (nodeB.getLoadFactor() <= encodingThreshold) {
2681 return 1;
2682 }
2683 return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2684 }
2685 }
2686 return super.compare(serviceA, serviceB);
2687 }
2688 }
2689 }