1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.serviceregistry.impl;
23
24 import static java.lang.String.format;
25 import static org.apache.commons.lang3.StringUtils.isBlank;
26 import static org.opencastproject.db.Queries.namedQuery;
27 import static org.opencastproject.job.api.AbstractJobProducer.ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY;
28 import static org.opencastproject.job.api.AbstractJobProducer.DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING;
29 import static org.opencastproject.job.api.Job.FailureReason.DATA;
30 import static org.opencastproject.job.api.Job.Status.FAILED;
31 import static org.opencastproject.serviceregistry.api.ServiceState.ERROR;
32 import static org.opencastproject.serviceregistry.api.ServiceState.NORMAL;
33 import static org.opencastproject.serviceregistry.api.ServiceState.WARNING;
34 import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
35
36 import org.opencastproject.db.DBSession;
37 import org.opencastproject.db.DBSessionFactory;
38 import org.opencastproject.job.api.Job;
39 import org.opencastproject.job.api.Job.Status;
40 import org.opencastproject.job.jpa.JpaJob;
41 import org.opencastproject.security.api.Organization;
42 import org.opencastproject.security.api.SecurityService;
43 import org.opencastproject.security.api.TrustedHttpClient;
44 import org.opencastproject.security.api.TrustedHttpClientException;
45 import org.opencastproject.security.api.User;
46 import org.opencastproject.serviceregistry.api.HostRegistration;
47 import org.opencastproject.serviceregistry.api.HostStatistics;
48 import org.opencastproject.serviceregistry.api.IncidentService;
49 import org.opencastproject.serviceregistry.api.Incidents;
50 import org.opencastproject.serviceregistry.api.JaxbServiceStatistics;
51 import org.opencastproject.serviceregistry.api.ServiceRegistration;
52 import org.opencastproject.serviceregistry.api.ServiceRegistry;
53 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
54 import org.opencastproject.serviceregistry.api.ServiceStatistics;
55 import org.opencastproject.serviceregistry.api.SystemLoad;
56 import org.opencastproject.serviceregistry.api.SystemLoad.NodeLoad;
57 import org.opencastproject.serviceregistry.impl.jmx.HostsStatistics;
58 import org.opencastproject.serviceregistry.impl.jmx.JobsStatistics;
59 import org.opencastproject.serviceregistry.impl.jmx.ServicesStatistics;
60 import org.opencastproject.serviceregistry.impl.jpa.HostRegistrationJpaImpl;
61 import org.opencastproject.serviceregistry.impl.jpa.ServiceRegistrationJpaImpl;
62 import org.opencastproject.systems.OpencastConstants;
63 import org.opencastproject.util.NotFoundException;
64 import org.opencastproject.util.UrlSupport;
65 import org.opencastproject.util.data.functions.Strings;
66 import org.opencastproject.util.function.ThrowingConsumer;
67 import org.opencastproject.util.jmx.JmxUtil;
68
69 import org.apache.commons.lang3.StringUtils;
70 import org.apache.commons.lang3.time.DateUtils;
71 import org.apache.commons.lang3.tuple.Pair;
72 import org.apache.http.HttpResponse;
73 import org.apache.http.HttpStatus;
74 import org.apache.http.client.methods.HttpHead;
75 import org.osgi.service.cm.ConfigurationException;
76 import org.osgi.service.cm.ManagedService;
77 import org.osgi.service.component.ComponentContext;
78 import org.osgi.service.component.annotations.Activate;
79 import org.osgi.service.component.annotations.Component;
80 import org.osgi.service.component.annotations.Deactivate;
81 import org.osgi.service.component.annotations.Modified;
82 import org.osgi.service.component.annotations.Reference;
83 import org.osgi.service.component.annotations.ReferenceCardinality;
84 import org.osgi.service.component.annotations.ReferencePolicy;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
87
88 import java.net.InetAddress;
89 import java.net.URI;
90 import java.net.URISyntaxException;
91 import java.util.ArrayList;
92 import java.util.Arrays;
93 import java.util.Collections;
94 import java.util.Comparator;
95 import java.util.Date;
96 import java.util.Dictionary;
97 import java.util.HashMap;
98 import java.util.List;
99 import java.util.Map;
100 import java.util.Objects;
101 import java.util.Optional;
102 import java.util.concurrent.Executors;
103 import java.util.concurrent.ScheduledExecutorService;
104 import java.util.concurrent.TimeUnit;
105 import java.util.concurrent.atomic.AtomicReference;
106 import java.util.function.Consumer;
107 import java.util.function.Function;
108 import java.util.stream.Collectors;
109
110 import javax.management.ObjectInstance;
111 import javax.persistence.EntityManager;
112 import javax.persistence.EntityManagerFactory;
113 import javax.persistence.LockModeType;
114 import javax.persistence.NoResultException;
115 import javax.persistence.TypedQuery;
116
117
118 @Component(
119 property = {
120 "service.description=Service registry"
121 },
122 immediate = true,
123 service = { ManagedService.class, ServiceRegistry.class, ServiceRegistryJpaImpl.class }
124 )
125 public class ServiceRegistryJpaImpl implements ServiceRegistry, ManagedService {
126
127
128 public static final String PERSISTENCE_UNIT = "org.opencastproject.common";
129
130
131 public static final String START_OPERATION = "START_OPERATION";
132
133
134 public static final String START_WORKFLOW = "START_WORKFLOW";
135
136
137 public static final String RESUME = "RESUME";
138
139
140 public static final String TYPE_WORKFLOW = "org.opencastproject.workflow";
141
142 static final Logger logger = LoggerFactory.getLogger(ServiceRegistryJpaImpl.class);
143
144
145 protected List<ObjectInstance> jmxBeans = new ArrayList<>();
146
147
148 private static final String JMX_HOSTS_STATISTICS_TYPE = "HostsStatistics";
149
150
151 private static final String JMX_SERVICES_STATISTICS_TYPE = "ServicesStatistics";
152
153
154 private static final String JMX_JOBS_STATISTICS_TYPE = "JobsStatistics";
155
156
157 private HostsStatistics hostsStatistics;
158
159
160 private ServicesStatistics servicesStatistics;
161
162
163 private JobsStatistics jobsStatistics;
164
165
166 private static final ThreadLocal<Job> currentJob = new ThreadLocal<>();
167
168
169 protected static final String OPT_MAXLOAD = "org.opencastproject.server.maxload";
170
171
172 protected static final String OPT_HEARTBEATINTERVAL = "heartbeat.interval";
173
174
175 protected static final String OPT_JOBSTATISTICS = "jobstats.collect";
176
177
178 protected static final String OPT_SERVICE_STATISTICS_MAX_JOB_AGE = "org.opencastproject.statistics.services.max_job_age";
179
180
181 protected static final String OPT_ENCODING_WORKERS = "org.opencastproject.encoding.workers";
182
183
184 protected static final String OPT_ENCODING_THRESHOLD = "org.opencastproject.encoding.workers.threshold";
185
186
187 protected TrustedHttpClient client = null;
188
189
190
191 static final int DEFAULT_DISPATCH_JOBS_LIMIT = 100;
192
193
194 static final boolean DEFAULT_JOB_STATISTICS = false;
195
196
197 static final int DEFAULT_SERVICE_STATISTICS_MAX_JOB_AGE = 14;
198
199 static final List<String> DEFAULT_ENCODING_WORKERS = new ArrayList<String>();
200
201 static final double DEFAULT_ENCODING_THRESHOLD = 0.0;
202
203
204 static final String MAX_ATTEMPTS_CONFIG_KEY = "max.attempts";
205
206
207 static final String NO_ERROR_STATE_SERVICE_TYPES_CONFIG_KEY = "no.error.state.service.types";
208
209
210 private static final int DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE = 10;
211
212
213 private static final boolean DEFAULT_ERROR_STATES_ENABLED = true;
214
215
216 protected int maxAttemptsBeforeErrorState = DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE;
217 private boolean errorStatesEnabled = DEFAULT_ERROR_STATES_ENABLED;
218
219
220 private List<String> noErrorStateServiceTypes = new ArrayList<>();
221
222
223 static final long DEFAULT_HEART_BEAT = 60;
224
225
226 static final float DEFAULT_JOB_LOAD = 0.1f;
227
228
229 protected String hostName;
230
231
232 protected String nodeName;
233
234
235 protected String jobHost;
236
237
238 protected static List<String> encodingWorkers = DEFAULT_ENCODING_WORKERS;
239
240
241 protected static double encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
242
243
244 protected EntityManagerFactory emf = null;
245
246 protected DBSessionFactory dbSessionFactory;
247
248 protected DBSession db;
249
250
251 protected ScheduledExecutorService scheduledExecutor = null;
252
253
254 protected SecurityService securityService = null;
255
256 protected IncidentService incidentService = null;
257
258 protected Incidents incidents;
259
260
261 protected boolean collectJobstats = DEFAULT_JOB_STATISTICS;
262
263
264 protected int maxJobAge = DEFAULT_SERVICE_STATISTICS_MAX_JOB_AGE;
265
266
267 protected static final List<Status> JOB_STATUSES_INFLUENCING_LOAD_BALANCING;
268
269 private static final Status[] activeJobStatus =
270 Arrays.stream(Status.values()).filter(Status::isActive).collect(Collectors.toList()).toArray(new Status[0]);
271
272 protected static HashMap<Long, Float> jobCache = new HashMap<>();
273
274 static {
275 JOB_STATUSES_INFLUENCING_LOAD_BALANCING = new ArrayList<>();
276 JOB_STATUSES_INFLUENCING_LOAD_BALANCING.add(Status.RUNNING);
277 }
278
279
280 protected Boolean acceptJobLoadsExeedingMaxLoad = true;
281
282
283 protected float localSystemLoad = 0.0f;
284
285
286 @Reference(target = "(osgi.unit.name=org.opencastproject.common)")
287 void setEntityManagerFactory(EntityManagerFactory emf) {
288 this.emf = emf;
289 }
290
291 @Reference
292 public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
293 this.dbSessionFactory = dbSessionFactory;
294 }
295
296 @Activate
297 public void activate(ComponentContext cc) {
298 logger.info("Activate service registry");
299
300 db = dbSessionFactory.createSession(emf);
301
302
303 if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty(OpencastConstants.SERVER_URL_PROPERTY))) {
304 hostName = UrlSupport.DEFAULT_BASE_URL;
305 } else {
306 hostName = cc.getBundleContext().getProperty(OpencastConstants.SERVER_URL_PROPERTY);
307 }
308
309
310 if (hostName.endsWith("/")) {
311 logger.warn("The configured value of {} ends with '/'. This is very likely a configuration error which could "
312 + "lead to services not working properly. Note that this configuration should not contain any part of "
313 + "the service paths.", OpencastConstants.SERVER_URL_PROPERTY);
314 }
315
316
317 cleanUndispatchableJobs(hostName);
318
319
320 try {
321 List<ServiceStatistics> serviceStatistics = getServiceStatistics();
322 hostsStatistics = new HostsStatistics(serviceStatistics);
323 servicesStatistics = new ServicesStatistics(hostName, serviceStatistics);
324 jobsStatistics = new JobsStatistics(hostName);
325 jmxBeans.add(JmxUtil.registerMXBean(hostsStatistics, JMX_HOSTS_STATISTICS_TYPE));
326 jmxBeans.add(JmxUtil.registerMXBean(servicesStatistics, JMX_SERVICES_STATISTICS_TYPE));
327 jmxBeans.add(JmxUtil.registerMXBean(jobsStatistics, JMX_JOBS_STATISTICS_TYPE));
328 } catch (ServiceRegistryException e) {
329 logger.error("Error registering JMX statistic beans", e);
330 }
331
332
333 if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty("org.opencastproject.jobs.url"))) {
334 jobHost = hostName;
335 } else {
336 jobHost = cc.getBundleContext().getProperty("org.opencastproject.jobs.url");
337 }
338
339
340 try {
341 if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty(OpencastConstants.NODE_NAME_PROPERTY))) {
342 nodeName = null;
343 } else {
344 nodeName = cc.getBundleContext().getProperty(OpencastConstants.NODE_NAME_PROPERTY);
345 }
346
347 float maxLoad = Runtime.getRuntime().availableProcessors();
348 if (cc != null && StringUtils.isNotBlank(cc.getBundleContext().getProperty(OPT_MAXLOAD))) {
349 try {
350 maxLoad = Float.parseFloat(cc.getBundleContext().getProperty(OPT_MAXLOAD));
351 logger.info("Max load has been set manually to {}", maxLoad);
352 } catch (NumberFormatException e) {
353 logger.warn("Configuration key '{}' is not an integer. Falling back to the number of cores ({})",
354 OPT_MAXLOAD, maxLoad);
355 }
356 }
357
358 logger.info("Node maximum load set to {}", maxLoad);
359
360 String address = InetAddress.getByName(URI.create(hostName).getHost()).getHostAddress();
361 long maxMemory = Runtime.getRuntime().maxMemory();
362 int cores = Runtime.getRuntime().availableProcessors();
363
364 registerHost(hostName, address, nodeName, maxMemory, cores, maxLoad);
365 } catch (Exception e) {
366 throw new IllegalStateException("Unable to register host " + hostName + " in the service registry", e);
367 }
368
369
370 if (cc != null) {
371 acceptJobLoadsExeedingMaxLoad = getOptContextProperty(cc, ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY).map(Strings.toBool)
372 .getOrElse(DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING);
373 }
374
375 localSystemLoad = 0;
376 logger.info("Activated");
377 }
378
379 @Override
380 public float getOwnLoad() {
381 return localSystemLoad;
382 }
383
384 @Override
385 public String getRegistryHostname() {
386 return hostName;
387 }
388
389 @Deactivate
390 public void deactivate() {
391 logger.info("deactivate service registry");
392
393
394 if (scheduledExecutor != null) {
395 try {
396 scheduledExecutor.shutdownNow();
397 if (!scheduledExecutor.isShutdown()) {
398 logger.info("Waiting for Dispatcher to terminate");
399 scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS);
400 }
401 } catch (InterruptedException e) {
402 logger.error("Error shutting down the Dispatcher", e);
403 }
404 }
405
406 for (ObjectInstance mbean : jmxBeans) {
407 JmxUtil.unregisterMXBean(mbean);
408 }
409
410 try {
411 unregisterHost(hostName);
412 } catch (ServiceRegistryException e) {
413 throw new IllegalStateException("Unable to unregister host " + hostName + " from the service registry", e);
414 }
415 }
416
417
418
419
420
421
422 @Override
423 public Job createJob(String type, String operation) throws ServiceRegistryException {
424 return createJob(this.hostName, type, operation, null, null, true, getCurrentJob(), DEFAULT_JOB_LOAD);
425 }
426
427
428
429
430
431
432
433 @Override
434 public Job createJob(String type, String operation, List<String> arguments) throws ServiceRegistryException {
435 return createJob(this.hostName, type, operation, arguments, null, true, getCurrentJob(), DEFAULT_JOB_LOAD);
436 }
437
438
439
440
441
442
443
444 @Override
445 public Job createJob(String type, String operation, List<String> arguments, Float jobLoad)
446 throws ServiceRegistryException {
447 return createJob(this.hostName, type, operation, arguments, null, true, getCurrentJob(), jobLoad);
448 }
449
450
451
452
453
454
455
456 @Override
457 public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable)
458 throws ServiceRegistryException {
459 return createJob(this.hostName, type, operation, arguments, payload, dispatchable, getCurrentJob(),
460 DEFAULT_JOB_LOAD);
461 }
462
463
464
465
466
467
468
469 @Override
470 public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
471 Float jobLoad) throws ServiceRegistryException {
472 return createJob(this.hostName, type, operation, arguments, payload, dispatchable, getCurrentJob(), jobLoad);
473 }
474
475 public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
476 Job parentJob) throws ServiceRegistryException {
477 return createJob(this.hostName, type, operation, arguments, payload, dispatchable, parentJob, DEFAULT_JOB_LOAD);
478 }
479
480
481
482
483
484
485
486 @Override
487 public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
488 Job parentJob, Float jobLoad) throws ServiceRegistryException {
489 return createJob(this.hostName, type, operation, arguments, payload, dispatchable, parentJob, jobLoad);
490 }
491
492
493
494
495 public Job createJob(String host, String serviceType, String operation, List<String> arguments, String payload,
496 boolean dispatchable, Job parentJob) throws ServiceRegistryException {
497 return createJob(host, serviceType, operation, arguments, payload, dispatchable, parentJob, DEFAULT_JOB_LOAD);
498 }
499
500
501
502
503 public Job createJob(String host, String serviceType, String operation, List<String> arguments, String payload,
504 boolean dispatchable, Job parentJob, float jobLoad) throws ServiceRegistryException {
505 if (StringUtils.isBlank(host)) {
506 throw new IllegalArgumentException("Host can't be null");
507 }
508 if (StringUtils.isBlank(serviceType)) {
509 throw new IllegalArgumentException("Service type can't be null");
510 }
511 if (StringUtils.isBlank(operation)) {
512 throw new IllegalArgumentException("Operation can't be null");
513 }
514
515 JpaJob jpaJob = db.execTxChecked(em -> {
516 ServiceRegistrationJpaImpl creatingService = getServiceRegistrationQuery(serviceType, host).apply(em)
517 .orElseThrow(() -> new ServiceRegistryException("No service registration exists for type '" + serviceType
518 + "' on host '" + host + "'"));
519
520 if (creatingService.getHostRegistration().isMaintenanceMode()) {
521 logger.warn("Creating a job from {}, which is currently in maintenance mode.", creatingService.getHost());
522 } else if (!creatingService.getHostRegistration().isActive()) {
523 logger.warn("Creating a job from {}, which is currently inactive.", creatingService.getHost());
524 }
525
526 User currentUser = securityService.getUser();
527 Organization currentOrganization = securityService.getOrganization();
528
529 JpaJob job = new JpaJob(currentUser, currentOrganization, creatingService, operation, arguments, payload,
530 dispatchable, jobLoad);
531
532
533 if (parentJob != null) {
534
535 JpaJob jpaParentJob = getJpaJobQuery(parentJob.getId()).apply(em).orElseThrow(() -> {
536 logger.error("job with id {} not found in the persistence context", parentJob);
537
538 removeFromLoadCache(parentJob.getId());
539 return new ServiceRegistryException(new NotFoundException());
540 });
541 job.setParentJob(jpaParentJob);
542
543
544 JpaJob jpaRootJob = jpaParentJob;
545 if (parentJob.getRootJobId() != null) {
546 jpaRootJob = getJpaJobQuery(parentJob.getRootJobId()).apply(em).orElseThrow(() -> {
547 logger.error("job with id {} not found in the persistence context", parentJob.getRootJobId());
548
549 removeFromLoadCache(parentJob.getRootJobId());
550 return new ServiceRegistryException(new NotFoundException());
551 });
552 }
553 job.setRootJob(jpaRootJob);
554 }
555
556
557 if (dispatchable) {
558 logger.trace("Queuing dispatchable '{}'", job);
559 job.setStatus(Status.QUEUED);
560 } else {
561 logger.trace("Giving new non-dispatchable '{}' its creating service as processor '{}'", job, creatingService);
562 job.setProcessorServiceRegistration(creatingService);
563 }
564
565 em.persist(job);
566 return job;
567 });
568
569 setJobUri(jpaJob);
570 return jpaJob.toJob();
571 }
572
573 @Override
574 public void removeJobs(List<Long> jobIds) throws NotFoundException, ServiceRegistryException {
575 for (long jobId: jobIds) {
576 if (jobId < 1) {
577 throw new NotFoundException("Job ID must be greater than zero (0)");
578 }
579 }
580
581 logger.debug("Start deleting jobs with IDs '{}'", jobIds);
582 try {
583 db.execTxChecked(em -> {
584 for (long jobId : jobIds) {
585 JpaJob job = em.find(JpaJob.class, jobId);
586 if (job == null) {
587 logger.error("Job with Id {} cannot be deleted: Not found.", jobId);
588 removeFromLoadCache(jobId);
589 throw new NotFoundException("Job with ID '" + jobId + "' not found");
590 }
591 deleteChildJobsQuery(jobId).accept(em);
592 em.remove(job);
593 removeFromLoadCache(jobId);
594 }
595 });
596 } catch (NotFoundException | ServiceRegistryException e) {
597 throw e;
598 } catch (Exception e) {
599 throw new ServiceRegistryException(e);
600 }
601
602 logger.info("Jobs with IDs '{}' deleted", jobIds);
603 }
604
605 private ThrowingConsumer<EntityManager, Exception> deleteChildJobsQuery(long jobId) {
606 return em -> {
607 List<Job> childJobs = getChildJobs(jobId);
608 if (childJobs.isEmpty()) {
609 logger.trace("No child jobs of job '{}' found to delete.", jobId);
610 return;
611 }
612
613 logger.debug("Start deleting child jobs of job '{}'", jobId);
614
615 try {
616 for (int i = childJobs.size() - 1; i >= 0; i--) {
617 Job job = childJobs.get(i);
618 JpaJob jobToDelete = em.find(JpaJob.class, job.getId());
619 em.remove(jobToDelete);
620 removeFromLoadCache(job.getId());
621 logger.debug("{} deleted", job);
622 }
623 logger.debug("Deleted all child jobs of job '{}'", jobId);
624 } catch (Exception e) {
625 throw new ServiceRegistryException("Unable to remove child jobs from " + jobId, e);
626 }
627 };
628 }
629
630 @Override
631 public void removeParentlessJobs(int lifetime) throws ServiceRegistryException {
632 int count = db.execTxChecked(em -> {
633 int c = 0;
634
635 List<Job> jobs = namedQuery.findAll("Job.withoutParent", JpaJob.class).apply(em).stream()
636 .map(JpaJob::toJob)
637 .filter(j -> j.getDateCreated().before(DateUtils.addDays(new Date(), -lifetime)))
638
639 .filter(j -> !START_OPERATION.equals(j.getOperation())
640 && !START_WORKFLOW.equals(j.getOperation())
641 && !RESUME.equals(j.getOperation()))
642 .filter(j -> j.getStatus().isTerminated())
643 .collect(Collectors.toList());
644
645 for (Job job : jobs) {
646 try {
647 removeJobs(Collections.singletonList(job.getId()));
648 logger.debug("Parentless '{}' removed", job);
649 c++;
650 } catch (NotFoundException e) {
651 logger.debug("Parentless '{} ' not found in database", job, e);
652 }
653 }
654
655 return c;
656 });
657
658
659 if (count > 0) {
660 logger.info("Successfully removed {} parentless jobs", count);
661 } else {
662 logger.trace("No parentless jobs found to remove");
663 }
664 }
665
666
667
668
669
670
671 @Override
672 public void updated(Dictionary properties) throws ConfigurationException {
673 logger.info("Updating service registry properties");
674
675 maxAttemptsBeforeErrorState = DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE;
676 errorStatesEnabled = DEFAULT_ERROR_STATES_ENABLED;
677 String maxAttempts = StringUtils.trimToNull((String) properties.get(MAX_ATTEMPTS_CONFIG_KEY));
678 if (maxAttempts != null) {
679 try {
680 maxAttemptsBeforeErrorState = Integer.parseInt(maxAttempts);
681 if (maxAttemptsBeforeErrorState < 0) {
682 errorStatesEnabled = false;
683 logger.info("Error states of services disabled");
684 } else {
685 errorStatesEnabled = true;
686 logger.info("Set max attempts before error state to {}", maxAttempts);
687 }
688 } catch (NumberFormatException e) {
689 logger.warn("Can not set max attempts before error state to {}. {} must be an integer", maxAttempts,
690 MAX_ATTEMPTS_CONFIG_KEY);
691 }
692 }
693
694 noErrorStateServiceTypes = new ArrayList<>();
695 String noErrorStateServiceTypesStr = StringUtils.trimToNull((String) properties.get(
696 NO_ERROR_STATE_SERVICE_TYPES_CONFIG_KEY));
697 if (noErrorStateServiceTypesStr != null) {
698 noErrorStateServiceTypes = Arrays.asList(noErrorStateServiceTypesStr.split("\\s*,\\s*"));
699 if (!noErrorStateServiceTypes.isEmpty()) {
700 logger.info("Set service types without error state to {}", String.join(", ", noErrorStateServiceTypes));
701 }
702 }
703
704 long heartbeatInterval = DEFAULT_HEART_BEAT;
705 String heartbeatIntervalString = StringUtils.trimToNull((String) properties.get(OPT_HEARTBEATINTERVAL));
706 if (StringUtils.isNotBlank(heartbeatIntervalString)) {
707 try {
708 heartbeatInterval = Long.parseLong(heartbeatIntervalString);
709 } catch (Exception e) {
710 logger.warn("Heartbeat interval '{}' is malformed, setting to {}", heartbeatIntervalString, DEFAULT_HEART_BEAT);
711 heartbeatInterval = DEFAULT_HEART_BEAT;
712 }
713 if (heartbeatInterval == 0) {
714 logger.info("Heartbeat disabled");
715 } else if (heartbeatInterval < 0) {
716 logger.warn("Heartbeat interval {} seconds too low, adjusting to {}", heartbeatInterval, DEFAULT_HEART_BEAT);
717 heartbeatInterval = DEFAULT_HEART_BEAT;
718 } else {
719 logger.info("Heartbeat interval set to {} seconds", heartbeatInterval);
720 }
721 }
722
723 String jobStatsString = StringUtils.trimToNull((String) properties.get(OPT_JOBSTATISTICS));
724 if (StringUtils.isNotBlank(jobStatsString)) {
725 try {
726 collectJobstats = Boolean.parseBoolean(jobStatsString);
727 } catch (Exception e) {
728 logger.warn("Job statistics collection flag '{}' is malformed, setting to {}", jobStatsString,
729 DEFAULT_JOB_STATISTICS);
730 collectJobstats = DEFAULT_JOB_STATISTICS;
731 }
732 }
733
734
735 String encodingWorkersString = (String) properties.get(OPT_ENCODING_WORKERS);
736 if (StringUtils.isNotBlank(encodingWorkersString)) {
737 encodingWorkers = Arrays.asList(encodingWorkersString.split("\\s*,\\s*"));
738 } else
739 encodingWorkers = DEFAULT_ENCODING_WORKERS;
740
741
742 String encodingThersholdString = StringUtils.trimToNull((String) properties.get(OPT_ENCODING_THRESHOLD));
743 if (StringUtils.isNotBlank(encodingThersholdString) && encodingThersholdString != null) {
744 try {
745 double encodingThresholdTmp = Double.parseDouble(encodingThersholdString);
746 if (encodingThresholdTmp >= 0 && encodingThresholdTmp <= 1)
747 encodingThreshold = encodingThresholdTmp;
748 else {
749 encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
750 logger.warn("org.opencastproject.encoding.workers.threshold is not between 0 and 1");
751 }
752 } catch (NumberFormatException e) {
753 logger.warn("Can not set encoding threshold to {}. {} must be an parsable double", encodingThersholdString,
754 OPT_ENCODING_THRESHOLD);
755 }
756 } else
757 encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
758
759
760 String maxJobAgeString = StringUtils.trimToNull((String) properties.get(OPT_SERVICE_STATISTICS_MAX_JOB_AGE));
761 if (maxJobAgeString != null) {
762 try {
763 maxJobAge = Integer.parseInt(maxJobAgeString);
764 logger.info("Set service statistics max job age to {}", maxJobAgeString);
765 } catch (NumberFormatException e) {
766 logger.warn("Can not set service statistics max job age to {}. {} must be an integer", maxJobAgeString,
767 OPT_SERVICE_STATISTICS_MAX_JOB_AGE);
768 }
769 }
770
771 scheduledExecutor = Executors.newScheduledThreadPool(1);
772
773
774 if (heartbeatInterval > 0) {
775 logger.debug("Starting service heartbeat at a custom interval of {}s", heartbeatInterval);
776 scheduledExecutor.scheduleWithFixedDelay(new JobProducerHeartbeat(), heartbeatInterval, heartbeatInterval,
777 TimeUnit.SECONDS);
778 }
779 }
780
781
782
783
784
785
786 @Modified
787 public void modified(Map<String, Object> config) throws ConfigurationException {
788 logger.debug("Modified serviceregistry");
789 }
790
791 private Function<EntityManager, Optional<JpaJob>> getJpaJobQuery(long id) {
792 return em -> namedQuery.findByIdOpt(JpaJob.class, id)
793 .apply(em)
794 .map(jpaJob -> {
795
796
797 em.refresh(jpaJob);
798 setJobUri(jpaJob);
799 return jpaJob;
800 });
801 }
802
803 @Override
804 public Job getJob(long id) throws NotFoundException, ServiceRegistryException {
805 try {
806 return db.exec(getJpaJobQuery(id))
807 .map(JpaJob::toJob)
808 .orElseThrow(NotFoundException::new);
809 } catch (NotFoundException e) {
810 throw e;
811 } catch (Exception e) {
812 throw new ServiceRegistryException(e);
813 }
814 }
815
816
817
818
819
820
821 @Override
822 public Job getCurrentJob() {
823 return currentJob.get();
824 }
825
826
827
828
829
830
831 @Override
832 public void setCurrentJob(Job job) {
833 currentJob.set(job);
834 }
835
836 JpaJob updateJob(JpaJob job) throws ServiceRegistryException {
837 try {
838
839
840
841 return db.execChecked(em -> {
842 Job oldJob = getJob(job.getId());
843 JpaJob jpaJob = updateInternal(job);
844 if (!TYPE_WORKFLOW.equals(job.getJobType()) && job.getJobLoad() > 0.0f
845 && job.getProcessorServiceRegistration() != null
846 && job.getProcessorServiceRegistration().getHost().equals(getRegistryHostname())) {
847 processCachedLoadChange(job);
848 }
849
850
851 if (oldJob.getStatus() != job.getStatus() && !TYPE_WORKFLOW.equals(job.getJobType())) {
852 updateServiceForFailover(job);
853 }
854
855 return jpaJob;
856 });
857 } catch (ServiceRegistryException e) {
858 throw e;
859 } catch (NotFoundException e) {
860
861 removeFromLoadCache(job.getId());
862 throw new ServiceRegistryException(e);
863 } catch (Exception e) {
864 throw new ServiceRegistryException(e);
865 }
866 }
867
868 @Override
869 public Job updateJob(Job job) throws ServiceRegistryException {
870 JpaJob jpaJob = JpaJob.from(job);
871 jpaJob.setProcessorServiceRegistration(
872 (ServiceRegistrationJpaImpl) getServiceRegistration(job.getJobType(), job.getProcessingHost()));
873 return updateJob(jpaJob).toJob();
874 }
875
876
877
878
879
880
881
882 private synchronized void processCachedLoadChange(JpaJob job) {
883 if (JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(job.getStatus()) && jobCache.get(job.getId()) == null) {
884 logger.debug("Adding to load cache: {}, type {}, load {}, status {}",
885 job, job.getJobType(), job.getJobLoad(), job.getStatus());
886 localSystemLoad += job.getJobLoad();
887 jobCache.put(job.getId(), job.getJobLoad());
888 } else if (jobCache.get(job.getId()) != null && Status.FINISHED.equals(job.getStatus())
889 || Status.FAILED.equals(job.getStatus()) || Status.WAITING.equals(job.getStatus())) {
890 logger.debug("Removing from load cache: {}, type {}, load {}, status {}",
891 job, job.getJobType(), job.getJobLoad(), job.getStatus());
892 localSystemLoad -= job.getJobLoad();
893 jobCache.remove(job.getId());
894 } else {
895 logger.debug("Ignoring for load cache: {}, type {}, status {}",
896 job, job.getJobType(), job.getStatus());
897 }
898 logger.debug("Current host load: {}, job load cache size: {}", format("%.1f", localSystemLoad), jobCache.size());
899
900 if (jobCache.isEmpty()) {
901 if (Math.abs(localSystemLoad) > 0.0000001F) {
902 logger.warn("No jobs in the job load cache, but load is {}: setting job load to 0", localSystemLoad);
903 }
904 localSystemLoad = 0.0F;
905 }
906 }
907
908 private synchronized void removeFromLoadCache(Long jobId) {
909 if (jobCache.get(jobId) != null) {
910 float jobLoad = jobCache.get(jobId);
911 logger.debug("Removing deleted job from load cache: Job {}, load {}", jobId, jobLoad);
912 localSystemLoad -= jobLoad;
913 jobCache.remove(jobId);
914 }
915 }
916
917 protected JpaJob setJobUri(JpaJob job) {
918 try {
919 job.setUri(new URI(jobHost + "/services/job/" + job.getId() + ".xml"));
920 } catch (URISyntaxException e) {
921 logger.warn("Can not set the job URI", e);
922 }
923 return job;
924 }
925
926
927
928
929
930
931
932
933 protected JpaJob updateInternal(JpaJob job) throws NotFoundException {
934 JpaJob fromDb = db.execTxChecked(em -> {
935 JpaJob j = em.find(JpaJob.class, job.getId());
936 if (j == null) {
937 throw new NotFoundException();
938 }
939
940 update(j, job);
941 em.merge(j);
942 return j;
943 });
944
945 job.setVersion(fromDb.toJob().getVersion());
946 setJobUri(job);
947 return job;
948 }
949
950 public void updateStatisticsJobData() {
951 jobsStatistics.updateAvg(db.exec(getAvgOperationsQuery()));
952 jobsStatistics.updateJobCount(db.exec(getCountPerHostServiceQuery()));
953 }
954
955
956
957
958
959
960
961
962 private ServiceRegistration updateServiceState(ServiceRegistrationJpaImpl registration) throws NotFoundException {
963 db.execTxChecked(em -> {
964 ServiceRegistrationJpaImpl fromDb = em.find(ServiceRegistrationJpaImpl.class, registration.getId());
965 if (fromDb == null) {
966 throw new NotFoundException();
967 }
968 fromDb.setServiceState(registration.getServiceState());
969 fromDb.setStateChanged(registration.getStateChanged());
970 fromDb.setWarningStateTrigger(registration.getWarningStateTrigger());
971 fromDb.setErrorStateTrigger(registration.getErrorStateTrigger());
972 });
973
974 servicesStatistics.updateService(registration);
975 return registration;
976 }
977
978
979
980
981
982
983
984
985
986
987 private void update(JpaJob fromDb, JpaJob jpaJob) {
988 final Job job = jpaJob.toJob();
989 final Date now = new Date();
990 final Status status = job.getStatus();
991 final Status fromDbStatus = fromDb.getStatus();
992
993 fromDb.setPayload(job.getPayload());
994 fromDb.setStatus(job.getStatus());
995 fromDb.setDispatchable(job.isDispatchable());
996 fromDb.setVersion(job.getVersion());
997 fromDb.setOperation(job.getOperation());
998 fromDb.setArguments(job.getArguments());
999
1000 if (job.getDateCreated() == null) {
1001 jpaJob.setDateCreated(now);
1002 fromDb.setDateCreated(now);
1003 job.setDateCreated(now);
1004 }
1005 if (job.getProcessingHost() != null) {
1006 ServiceRegistrationJpaImpl processingService = (ServiceRegistrationJpaImpl) getServiceRegistration(
1007 job.getJobType(), job.getProcessingHost());
1008 logger.debug("{} has host '{}': setting processor service to '{}'", job, job.getProcessingHost(), processingService);
1009 fromDb.setProcessorServiceRegistration(processingService);
1010 } else {
1011 logger.debug("Unsetting previous processor service registration for {}", job);
1012 fromDb.setProcessorServiceRegistration(null);
1013 }
1014 if (Status.RUNNING.equals(status) && !Status.WAITING.equals(fromDbStatus)) {
1015 if (job.getDateStarted() == null) {
1016 jpaJob.setDateStarted(now);
1017 jpaJob.setQueueTime(now.getTime() - job.getDateCreated().getTime());
1018 fromDb.setDateStarted(now);
1019 fromDb.setQueueTime(now.getTime() - job.getDateCreated().getTime());
1020 job.setDateStarted(now);
1021 job.setQueueTime(now.getTime() - job.getDateCreated().getTime());
1022 }
1023 } else if (Status.FAILED.equals(status)) {
1024
1025 if (job.getDateCompleted() == null) {
1026 fromDb.setDateCompleted(now);
1027 jpaJob.setDateCompleted(now);
1028 job.setDateCompleted(now);
1029 if (job.getDateStarted() != null) {
1030 jpaJob.setRunTime(now.getTime() - job.getDateStarted().getTime());
1031 fromDb.setRunTime(now.getTime() - job.getDateStarted().getTime());
1032 job.setRunTime(now.getTime() - job.getDateStarted().getTime());
1033 }
1034 }
1035 } else if (Status.FINISHED.equals(status)) {
1036 if (job.getDateStarted() == null) {
1037
1038
1039 jpaJob.setDateStarted(job.getDateCreated());
1040 job.setDateStarted(job.getDateCreated());
1041 }
1042 if (job.getDateCompleted() == null) {
1043 jpaJob.setDateCompleted(now);
1044 jpaJob.setRunTime(now.getTime() - job.getDateStarted().getTime());
1045 fromDb.setDateCompleted(now);
1046 fromDb.setRunTime(now.getTime() - job.getDateStarted().getTime());
1047 job.setDateCompleted(now);
1048 job.setRunTime(now.getTime() - job.getDateStarted().getTime());
1049 }
1050 }
1051 }
1052
1053
1054
1055
1056
1057
1058
1059
1060 protected Function<EntityManager, Optional<HostRegistrationJpaImpl>> fetchHostRegistrationQuery(String host) {
1061 return namedQuery.findOpt(
1062 "HostRegistration.byHostName",
1063 HostRegistrationJpaImpl.class,
1064 Pair.of("host", host)
1065 );
1066 }
1067
1068
1069
1070
1071
1072
1073 @Override
1074 public void registerHost(String host, String address, String nodeName, long memory, int cores, float maxLoad)
1075 throws ServiceRegistryException {
1076 try {
1077 HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1078
1079 Optional<HostRegistrationJpaImpl> hostRegistrationOpt = fetchHostRegistrationQuery(host).apply(em);
1080 HostRegistrationJpaImpl hr;
1081
1082 if (hostRegistrationOpt.isEmpty()) {
1083 hr = new HostRegistrationJpaImpl(host, address, nodeName, memory, cores, maxLoad, true, false);
1084 em.persist(hr);
1085 } else {
1086 hr = hostRegistrationOpt.get();
1087 hr.setIpAddress(address);
1088 hr.setNodeName(nodeName);
1089 hr.setMemory(memory);
1090 hr.setCores(cores);
1091 hr.setMaxLoad(maxLoad);
1092 hr.setOnline(true);
1093 em.merge(hr);
1094 }
1095 logger.info("Registering {} with a maximum load of {}", host, maxLoad);
1096 return hr;
1097 });
1098
1099 hostsStatistics.updateHost(hostRegistration);
1100 } catch (Exception e) {
1101 throw new ServiceRegistryException(e);
1102 }
1103 }
1104
1105
1106
1107
1108
1109
1110 @Override
1111 public void unregisterHost(String host) throws ServiceRegistryException {
1112 try {
1113 HostRegistrationJpaImpl existingHostRegistration = db.execTxChecked(em -> {
1114 HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1115 () -> new IllegalArgumentException("Host '" + host + "' is not registered, so it can not be unregistered"));
1116
1117 hr.setOnline(false);
1118 for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1119 unRegisterService(serviceRegistration.getServiceType(), serviceRegistration.getHost());
1120 }
1121 em.merge(hr);
1122
1123 logger.info("Unregistering {}", host);
1124 return hr;
1125 });
1126
1127 logger.info("Host {} unregistered", host);
1128 hostsStatistics.updateHost(existingHostRegistration);
1129 } catch (Exception e) {
1130 throw new ServiceRegistryException(e);
1131 }
1132 }
1133
1134
1135
1136
1137
1138
1139 @Override
1140 public void enableHost(String host) throws ServiceRegistryException, NotFoundException {
1141 try {
1142 HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1143
1144 HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1145 () -> new NotFoundException("Host '" + host + "' is currently not registered, so it can not be enabled"));
1146 hr.setActive(true);
1147 em.merge(hr);
1148 logger.info("Enabling {}", host);
1149 return hr;
1150 });
1151
1152 db.execTxChecked(em -> {
1153 for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1154 ((ServiceRegistrationJpaImpl) serviceRegistration).setActive(true);
1155 em.merge(serviceRegistration);
1156 servicesStatistics.updateService(serviceRegistration);
1157 }
1158 });
1159
1160 hostsStatistics.updateHost(hostRegistration);
1161 } catch (NotFoundException e) {
1162 throw e;
1163 } catch (Exception e) {
1164 throw new ServiceRegistryException(e);
1165 }
1166 }
1167
1168
1169
1170
1171
1172
1173 @Override
1174 public void disableHost(String host) throws ServiceRegistryException, NotFoundException {
1175 try {
1176 HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1177 HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1178 () -> new NotFoundException("Host '" + host + "' is not currently registered, so it can not be disabled"));
1179
1180 hr.setActive(false);
1181 for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1182 ((ServiceRegistrationJpaImpl) serviceRegistration).setActive(false);
1183 em.merge(serviceRegistration);
1184 servicesStatistics.updateService(serviceRegistration);
1185 }
1186 em.merge(hr);
1187
1188 logger.info("Disabling {}", host);
1189 return hr;
1190 });
1191
1192 hostsStatistics.updateHost(hostRegistration);
1193 } catch (NotFoundException e) {
1194 throw e;
1195 } catch (Exception e) {
1196 throw new ServiceRegistryException(e);
1197 }
1198 }
1199
1200
1201
1202
1203
1204
1205
1206 @Override
1207 public ServiceRegistration registerService(String serviceType, String baseUrl, String path)
1208 throws ServiceRegistryException {
1209 return registerService(serviceType, baseUrl, path, false);
1210 }
1211
1212
1213
1214
1215
1216
1217
1218 @Override
1219 public ServiceRegistration registerService(String serviceType, String baseUrl, String path, boolean jobProducer)
1220 throws ServiceRegistryException {
1221 cleanRunningJobs(serviceType, baseUrl);
1222 return setOnlineStatus(serviceType, baseUrl, path, true, jobProducer);
1223 }
1224
1225 protected Function<EntityManager, Optional<ServiceRegistrationJpaImpl>> getServiceRegistrationQuery(
1226 String serviceType, String host) {
1227 return namedQuery.findOpt(
1228 "ServiceRegistration.getRegistration",
1229 ServiceRegistrationJpaImpl.class,
1230 Pair.of("serviceType", serviceType),
1231 Pair.of("host", host)
1232 );
1233 }
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248 protected ServiceRegistration setOnlineStatus(String serviceType, String baseUrl, String path, boolean online,
1249 Boolean jobProducer) throws ServiceRegistryException {
1250 if (isBlank(serviceType) || isBlank(baseUrl)) {
1251 logger.info("Uninformed baseUrl '{}' or service '{}' (path '{}')", baseUrl, serviceType, path);
1252 throw new IllegalArgumentException("serviceType and baseUrl must not be blank");
1253 }
1254
1255 try {
1256 AtomicReference<HostRegistrationJpaImpl> hostRegistration = new AtomicReference<>();
1257 AtomicReference<ServiceRegistrationJpaImpl> registration = new AtomicReference<>();
1258
1259 db.execTxChecked(em -> {
1260 HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(baseUrl).apply(em).orElseThrow(() -> {
1261 logger.info("No associated host registration for '{}' or service '{}' (path '{}')", baseUrl, serviceType,path);
1262 return new IllegalStateException(
1263 "A service registration can not be updated when it has no associated host registration");
1264 });
1265 hostRegistration.set(hr);
1266
1267 ServiceRegistrationJpaImpl sr;
1268 Optional<ServiceRegistrationJpaImpl> srOpt = getServiceRegistrationQuery(serviceType, baseUrl).apply(em);
1269 if (srOpt.isEmpty()) {
1270 if (isBlank(path)) {
1271
1272 throw new IllegalArgumentException("path must not be blank when registering new services");
1273 }
1274
1275
1276 sr = new ServiceRegistrationJpaImpl(hr, serviceType, path, Objects.requireNonNullElse(jobProducer, false));
1277 em.persist(sr);
1278 } else {
1279 sr = srOpt.get();
1280 if (StringUtils.isNotBlank(path)) {
1281 sr.setPath(path);
1282 }
1283 sr.setOnline(online);
1284 if (jobProducer != null) {
1285 sr.setJobProducer(jobProducer);
1286 }
1287 em.merge(sr);
1288 }
1289 registration.set(sr);
1290 });
1291
1292 hostsStatistics.updateHost(hostRegistration.get());
1293 servicesStatistics.updateService(registration.get());
1294 return registration.get();
1295 } catch (Exception e) {
1296 throw new ServiceRegistryException(e);
1297 }
1298 }
1299
1300
1301
1302
1303
1304
1305 @Override
1306 public void unRegisterService(String serviceType, String baseUrl) throws ServiceRegistryException {
1307 logger.info("Unregistering Service {}@{} and cleaning its running jobs", serviceType, baseUrl);
1308
1309
1310 setOnlineStatus(serviceType, baseUrl, null, false, null);
1311 cleanRunningJobs(serviceType, baseUrl);
1312 }
1313
1314
1315
1316
1317 private void cleanUndispatchableJobs(String hostName) {
1318 logger.debug("Starting check for undispatchable jobs for host {}", hostName);
1319
1320 try {
1321 db.execTxChecked(em -> {
1322 List<JpaJob> undispatchableJobs = namedQuery.findAll(
1323 "Job.undispatchable.status",
1324 JpaJob.class,
1325 Pair.of("statuses", List.of(
1326 Status.INSTANTIATED.ordinal(),
1327 Status.RUNNING.ordinal()
1328 ))
1329 ).apply(em);
1330
1331 if (undispatchableJobs.size() > 0) {
1332 logger.info("Found {} undispatchable jobs on host {}", undispatchableJobs.size(), hostName);
1333 }
1334
1335 for (JpaJob job : undispatchableJobs) {
1336
1337 String jobHost = "";
1338 if (job.getProcessorServiceRegistration() != null) {
1339 jobHost = job.getProcessorServiceRegistration().getHost();
1340 }
1341
1342 if (!jobHost.equals(hostName)) {
1343 logger.debug("Will not cancel undispatchable job {} for host {}, it is running on a different host ({})",
1344 job, hostName, jobHost);
1345 continue;
1346 }
1347
1348 logger.info("Cancelling the running undispatchable job {}, it was orphaned on this host ({})", job, hostName);
1349 job.setStatus(Status.CANCELLED);
1350 em.merge(job);
1351 }
1352 });
1353 } catch (Exception e) {
1354 logger.error("Unable to clean undispatchable jobs for host {}! {}", hostName, e.getMessage());
1355 }
1356 }
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368 private void cleanRunningJobs(String serviceType, String baseUrl) throws ServiceRegistryException {
1369 try {
1370 db.execTxChecked(em -> {
1371 TypedQuery<JpaJob> query = em.createNamedQuery("Job.processinghost.status", JpaJob.class)
1372 .setLockMode(LockModeType.PESSIMISTIC_WRITE)
1373 .setParameter("statuses", List.of(
1374 Status.RUNNING.ordinal(),
1375 Status.DISPATCHING.ordinal(),
1376 Status.WAITING.ordinal()
1377 ))
1378 .setParameter("host", baseUrl)
1379 .setParameter("serviceType", serviceType);
1380
1381 List<JpaJob> unregisteredJobs = query.getResultList();
1382 if (unregisteredJobs.size() > 0) {
1383 logger.info("Found {} jobs to clean for {}@{}", unregisteredJobs.size(), serviceType, baseUrl);
1384 }
1385
1386 for (JpaJob job : unregisteredJobs) {
1387 if (job.isDispatchable()) {
1388 em.refresh(job);
1389
1390 if (Status.CANCELLED.equals(job.getStatus()) || Status.RESTART.equals(job.getStatus())) {
1391 continue;
1392 }
1393
1394 if (job.getRootJob() != null && Status.PAUSED.equals(job.getRootJob().getStatus())) {
1395 JpaJob rootJob = job.getRootJob();
1396 cancelAllChildrenQuery(rootJob).accept(em);
1397 rootJob.setStatus(Status.RESTART);
1398 rootJob.setOperation(START_OPERATION);
1399 em.merge(rootJob);
1400 continue;
1401 }
1402
1403 logger.info("Marking child jobs from {} as canceled", job);
1404 cancelAllChildrenQuery(job).accept(em);
1405
1406 logger.info("Rescheduling lost {}", job);
1407 job.setStatus(Status.RESTART);
1408 job.setProcessorServiceRegistration(null);
1409 } else {
1410 logger.info("Marking lost {} as failed", job);
1411 job.setStatus(Status.FAILED);
1412 }
1413
1414 em.merge(job);
1415 }
1416 });
1417 } catch (Exception e) {
1418 throw new ServiceRegistryException(e);
1419 }
1420 }
1421
1422
1423
1424
1425
1426
1427
1428 private Consumer<EntityManager> cancelAllChildrenQuery(JpaJob job) {
1429 return em -> job.getChildJobs().stream()
1430 .peek(em::refresh)
1431 .filter(child -> Status.CANCELLED.equals(child.getStatus()))
1432 .forEach(child -> {
1433 cancelAllChildrenQuery(child).accept(em);
1434 child.setStatus(Status.CANCELLED);
1435 em.merge(child);
1436 });
1437 }
1438
1439
1440
1441
1442
1443
1444 @Override
1445 public void setMaintenanceStatus(String baseUrl, boolean maintenance) throws NotFoundException {
1446 logger.info("Setting maintenance mode on host '{}'", baseUrl);
1447 HostRegistrationJpaImpl reg = db.execTxChecked(em -> {
1448 HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(baseUrl).apply(em).orElseThrow(() -> {
1449 logger.warn("Can not set maintenance mode because host '{}' was not registered", baseUrl);
1450 return new NotFoundException("Can not set maintenance mode on a host that has not been registered");
1451 });
1452 hr.setMaintenanceMode(maintenance);
1453 em.merge(hr);
1454 return hr;
1455 });
1456
1457 hostsStatistics.updateHost(reg);
1458 logger.info("Finished setting maintenance mode on host '{}'", baseUrl);
1459 }
1460
1461
1462
1463
1464
1465
1466 @Override
1467 public List<ServiceRegistration> getServiceRegistrations() {
1468 return db.exec(getServiceRegistrationsQuery());
1469 }
1470
1471 @Override
1472 public Incidents incident() {
1473 return incidents;
1474 }
1475
1476 private List<ServiceRegistration> getOnlineServiceRegistrations() {
1477 return db.exec(namedQuery.findAll("ServiceRegistration.getAllOnline", ServiceRegistration.class));
1478 }
1479
1480
1481
1482
1483
1484
1485 protected Function<EntityManager, List<ServiceRegistration>> getServiceRegistrationsQuery() {
1486 return namedQuery.findAll("ServiceRegistration.getAll", ServiceRegistration.class);
1487 }
1488
1489
1490
1491
1492
1493
1494 @Override
1495 public List<HostRegistration> getHostRegistrations() {
1496 return db.exec(getHostRegistrationsQuery());
1497 }
1498
1499 @Override
1500 public HostStatistics getHostStatistics() {
1501 HostStatistics statistics = new HostStatistics();
1502
1503 db.exec(namedQuery.findAll(
1504 "HostRegistration.jobStatistics",
1505 Object[].class,
1506 Pair.of("status", List.of(Status.QUEUED.ordinal(), Status.RUNNING.ordinal()))
1507 )).forEach(row -> {
1508 final long host = ((Number) row[0]).longValue();
1509 final int status = ((Number) row[1]).intValue();
1510 final long count = ((Number) row[2]).longValue();
1511
1512 if (status == Status.RUNNING.ordinal()) {
1513 statistics.addRunning(host, count);
1514 } else {
1515 statistics.addQueued(host, count);
1516 }
1517 });
1518
1519 return statistics;
1520 }
1521
1522
1523
1524
1525
1526
1527 protected Function<EntityManager, List<HostRegistration>> getHostRegistrationsQuery() {
1528 return namedQuery.findAll("HostRegistration.getAll", HostRegistration.class);
1529 }
1530
1531 @Override
1532 public HostRegistration getHostRegistration(String hostname) throws ServiceRegistryException {
1533 return db.exec(getHostRegistrationQuery(hostname));
1534 }
1535
1536 protected Function<EntityManager, HostRegistration> getHostRegistrationQuery(String hostname) {
1537 return namedQuery.find(
1538 "HostRegistration.byHostName",
1539 HostRegistration.class,
1540 Pair.of("host", hostname)
1541 );
1542 }
1543
1544
1545
1546
1547
1548
1549 @Override
1550 public List<Job> getChildJobs(long id) throws ServiceRegistryException {
1551 try {
1552 List<JpaJob> jobs = db.exec(namedQuery.findAll(
1553 "Job.root.children",
1554 JpaJob.class,
1555 Pair.of("id", id)
1556 ));
1557
1558 if (jobs.size() == 0) {
1559 jobs = db.exec(getChildrenQuery(id));
1560 }
1561
1562 return jobs.stream()
1563 .map(this::setJobUri)
1564 .map(JpaJob::toJob)
1565 .collect(Collectors.toList());
1566 } catch (Exception e) {
1567 throw new ServiceRegistryException(e);
1568 }
1569 }
1570
1571 private Function<EntityManager, List<JpaJob>> getChildrenQuery(long id) {
1572 return em -> {
1573 TypedQuery<JpaJob> query = em
1574 .createNamedQuery("Job.children", JpaJob.class)
1575 .setParameter("id", id);
1576
1577 List<JpaJob> childJobs = query.getResultList();
1578
1579 List<JpaJob> result = new ArrayList<>(childJobs);
1580 childJobs.stream()
1581 .map(j -> getChildrenQuery(j.getId()).apply(em))
1582 .forEach(result::addAll);
1583
1584 return result;
1585 };
1586 }
1587
1588
1589
1590
1591
1592
1593 @Override
1594 public List<Job> getJobs(String type, Status status) throws ServiceRegistryException {
1595 logger.trace("Getting jobs '{}' and '{}'", type, status);
1596
1597 Function<EntityManager, List<JpaJob>> jobsQuery;
1598 if (type == null && status == null) {
1599 jobsQuery = namedQuery.findAll("Job.all", JpaJob.class);
1600 } else if (type == null) {
1601 jobsQuery = namedQuery.findAll(
1602 "Job.status",
1603 JpaJob.class,
1604 Pair.of("status", status.ordinal())
1605 );
1606 } else if (status == null) {
1607 jobsQuery = namedQuery.findAll(
1608 "Job.type",
1609 JpaJob.class,
1610 Pair.of("serviceType", type)
1611 );
1612 } else {
1613 jobsQuery = namedQuery.findAll(
1614 "Job",
1615 JpaJob.class,
1616 Pair.of("status", status.ordinal()),
1617 Pair.of("serviceType", type)
1618 );
1619 }
1620
1621 try {
1622 return db.exec(jobsQuery).stream()
1623 .peek(this::setJobUri)
1624 .map(JpaJob::toJob)
1625 .collect(Collectors.toList());
1626 } catch (Exception e) {
1627 throw new ServiceRegistryException(e);
1628 }
1629 }
1630
1631 @Override
1632 public List<String> getJobPayloads(String operation) throws ServiceRegistryException {
1633 try {
1634 return db.exec(namedQuery.findAll(
1635 "Job.payload",
1636 String.class,
1637 Pair.of("operation", operation)
1638 ));
1639 } catch (Exception e) {
1640 throw new ServiceRegistryException(e);
1641 }
1642 }
1643
1644 @Override
1645 public List<String> getJobPayloads(String operation, int limit, int offset) throws ServiceRegistryException {
1646 try {
1647 return db.exec(em -> {
1648 return em.createNamedQuery("Job.payload", String.class)
1649 .setParameter("operation", operation)
1650 .setMaxResults(limit)
1651 .setFirstResult(offset)
1652 .getResultList();
1653 });
1654 } catch (Exception e) {
1655 throw new ServiceRegistryException(e);
1656 }
1657 }
1658
1659 @Override
1660 public int getJobCount(final String operation) throws ServiceRegistryException {
1661 try {
1662 return db.exec(namedQuery.find(
1663 "Job.countByOperationOnly",
1664 Number.class,
1665 Pair.of("operation", operation)
1666 )).intValue();
1667 } catch (Exception e) {
1668 throw new ServiceRegistryException(e);
1669 }
1670 }
1671
1672
1673
1674
1675
1676
1677 @Override
1678 public List<Job> getActiveJobs() throws ServiceRegistryException {
1679 try {
1680 return db.exec(getJobsByStatusQuery(activeJobStatus)).stream()
1681 .map(JpaJob::toJob)
1682 .collect(Collectors.toList());
1683 } catch (Exception e) {
1684 throw new ServiceRegistryException(e);
1685 }
1686 }
1687
1688
1689
1690
1691
1692
1693
1694
1695 public Function<EntityManager, List<JpaJob>> getJobsByStatusQuery(Status... statuses) {
1696 if (statuses == null || statuses.length < 1) {
1697 throw new IllegalArgumentException("At least one job status must be given.");
1698 }
1699
1700 return namedQuery.findAll(
1701 "Job.statuses",
1702 JpaJob.class,
1703 Pair.of("statuses", Arrays.stream(statuses).map(Enum::ordinal).collect(Collectors.toList()))
1704 ).andThen(jobs -> jobs.stream()
1705 .peek(this::setJobUri)
1706 .collect(Collectors.toList()));
1707 }
1708
1709 @Override
1710 public Map<String, Map<String, Long>> countActiveByOrganizationAndHost() {
1711 var rows = db.exec(namedQuery.findAll(
1712 "Job.countByOrganizationAndHost",
1713 Object[].class,
1714 Pair.of("statuses", Arrays.stream(activeJobStatus).map(Enum::ordinal).collect(Collectors.toList()))
1715 )).stream().collect(Collectors.toList());
1716 var orgMap = new HashMap<String, Map<String, Long>>();
1717 for (Object[] row: rows) {
1718 var org = (String) row[0];
1719 var host = (String) row[1];
1720 var count = (Long) row[2];
1721 orgMap.computeIfAbsent(org, o -> new HashMap<>()).put(host, count);
1722 }
1723 return orgMap;
1724 }
1725
1726 @Override
1727 public Map<String, Long> countActiveTypeByOrganization(final String operation) {
1728 return db.exec(namedQuery.findAll(
1729 "Job.countTypeByOrganization",
1730 Object[].class,
1731 Pair.of("statuses", Arrays.stream(activeJobStatus).map(Enum::ordinal).collect(Collectors.toList())),
1732 Pair.of("operation", operation)
1733 )).stream().collect(Collectors.toMap(
1734 row -> (String) row[0],
1735 row -> (Long) row[1]
1736 ));
1737 }
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747 protected Function<EntityManager, List<JpaJob>> getDispatchableJobsWithStatusQuery(int offset, int limit,
1748 Status... statuses) {
1749 return em -> {
1750 if (statuses == null) {
1751 return Collections.emptyList();
1752 }
1753
1754 TypedQuery<JpaJob> query = em
1755 .createNamedQuery("Job.dispatchable.status", JpaJob.class)
1756 .setParameter("statuses", Arrays.stream(statuses).map(Enum::ordinal).collect(Collectors.toList()));
1757 if (offset > 0) {
1758 query.setFirstResult(offset);
1759 }
1760 if (limit > 0) {
1761 query.setMaxResults(limit);
1762 }
1763 return query.getResultList();
1764 };
1765 }
1766
1767 Function<EntityManager, List<Object[]>> getAvgOperationsQuery() {
1768 return namedQuery.findAll("Job.avgOperation", Object[].class);
1769 }
1770
1771 Function<EntityManager, List<Object[]>> getCountPerHostServiceQuery() {
1772 return namedQuery.findAll("Job.countPerHostService", Object[].class);
1773 }
1774
1775
1776
1777
1778
1779
1780 @Override
1781 public long count(String serviceType, Status status) throws ServiceRegistryException {
1782 Function<EntityManager, Number> countQuery;
1783 if (serviceType == null && status == null) {
1784 countQuery = namedQuery.find(
1785 "Job.count.all",
1786 Number.class
1787 );
1788 } else if (serviceType == null) {
1789 countQuery = namedQuery.find(
1790 "Job.count.nullType",
1791 Number.class,
1792 Pair.of("status", status.ordinal())
1793 );
1794 } else if (status == null) {
1795 countQuery = namedQuery.find(
1796 "Job.count.nullStatus",
1797 Number.class,
1798 Pair.of("serviceType", serviceType)
1799 );
1800 } else {
1801 countQuery = namedQuery.find(
1802 "Job.count",
1803 Number.class,
1804 Pair.of("status", status.ordinal()),
1805 Pair.of("serviceType", serviceType)
1806 );
1807 }
1808
1809 try {
1810 return db.exec(countQuery).longValue();
1811 } catch (Exception e) {
1812 throw new ServiceRegistryException(e);
1813 }
1814 }
1815
1816
1817
1818
1819
1820
1821
1822 @Override
1823 public long countByHost(String serviceType, String host, Status status) throws ServiceRegistryException {
1824 Function<EntityManager, Number> countQuery;
1825 if (serviceType != null && !serviceType.isEmpty()) {
1826 countQuery = namedQuery.find(
1827 "Job.countByHost",
1828 Number.class,
1829 Pair.of("serviceType", serviceType),
1830 Pair.of("status", status.ordinal()),
1831 Pair.of("host", host)
1832 );
1833 } else {
1834 countQuery = namedQuery.find(
1835 "Job.countByHost.nullType",
1836 Number.class,
1837 Pair.of("status", status.ordinal()),
1838 Pair.of("host", host)
1839 );
1840 }
1841
1842 try {
1843 return db.exec(countQuery).longValue();
1844 } catch (Exception e) {
1845 throw new ServiceRegistryException(e);
1846 }
1847 }
1848
1849
1850
1851
1852
1853
1854
1855
1856 @Override
1857 public long countByOperation(String serviceType, String operation, Status status) throws ServiceRegistryException {
1858 try {
1859 return db.exec(namedQuery.find(
1860 "Job.countByOperation",
1861 Number.class,
1862 Pair.of("status", status.ordinal()),
1863 Pair.of("serviceType", serviceType),
1864 Pair.of("operation", operation)
1865 )).longValue();
1866 } catch (Exception e) {
1867 throw new ServiceRegistryException(e);
1868 }
1869 }
1870
1871
1872
1873
1874
1875
1876
1877 @Override
1878 public long count(String serviceType, String host, String operation, Status status) throws ServiceRegistryException {
1879 if (StringUtils.isBlank(serviceType) || StringUtils.isBlank(host) || StringUtils.isBlank(operation)
1880 || status == null) {
1881 throw new IllegalArgumentException("service type, host, operation, and status must be provided");
1882 }
1883
1884 try {
1885 return db.exec(namedQuery.find(
1886 "Job.fullMonty",
1887 Number.class,
1888 Pair.of("status", status.ordinal()),
1889 Pair.of("serviceType", serviceType),
1890 Pair.of("operation", operation)
1891 )).longValue();
1892 } catch (Exception e) {
1893 throw new ServiceRegistryException(e);
1894 }
1895 }
1896
1897
1898
1899
1900
1901
1902 @Override
1903 public List<ServiceStatistics> getServiceStatistics() throws ServiceRegistryException {
1904 Date now = new Date();
1905 try {
1906 return db.exec(getServiceStatisticsQuery(
1907 DateUtils.addDays(now, -maxJobAge),
1908 DateUtils.addDays(now, 1)
1909 ));
1910 } catch (Exception e) {
1911 throw new ServiceRegistryException(e);
1912 }
1913 }
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925 private Function<EntityManager, List<ServiceStatistics>> getServiceStatisticsQuery(Date startDate, Date endDate) {
1926 return em -> {
1927 Map<Long, JaxbServiceStatistics> statsMap = new HashMap<>();
1928
1929
1930 namedQuery.findAll("ServiceRegistration.getAll", ServiceRegistrationJpaImpl.class).apply(em).forEach(s ->
1931 statsMap.put(s.getId(), new JaxbServiceStatistics(s))
1932 );
1933
1934 if (collectJobstats) {
1935
1936 namedQuery.findAll(
1937 "ServiceRegistration.statistics",
1938 Object[].class,
1939 Pair.of("minDateCreated", startDate),
1940 Pair.of("maxDateCreated", endDate)
1941 ).apply(em).forEach(row -> {
1942 Number serviceRegistrationId = (Number) row[0];
1943 if (serviceRegistrationId == null || serviceRegistrationId.longValue() == 0) {
1944 return;
1945 }
1946 Status status = Status.values()[((Number) row[1]).intValue()];
1947 Number count = (Number) row[2];
1948 Number meanQueueTime = (Number) row[3];
1949 Number meanRunTime = (Number) row[4];
1950
1951
1952 JaxbServiceStatistics stats = statsMap.get(serviceRegistrationId.longValue());
1953 if (stats == null) {
1954 return;
1955 }
1956
1957
1958 if (status != null) {
1959 switch (status) {
1960 case RUNNING:
1961 stats.setRunningJobs(count.intValue());
1962 break;
1963 case QUEUED:
1964 case DISPATCHING:
1965 stats.setQueuedJobs(count.intValue());
1966 break;
1967 case FINISHED:
1968 stats.setMeanRunTime(meanRunTime.longValue());
1969 stats.setMeanQueueTime(meanQueueTime.longValue());
1970 stats.setFinishedJobs(count.intValue());
1971 break;
1972 default:
1973 break;
1974 }
1975 }
1976 });
1977 }
1978
1979 List<ServiceStatistics> stats = new ArrayList<>(statsMap.values());
1980 stats.sort((o1, o2) -> {
1981 ServiceRegistration reg1 = o1.getServiceRegistration();
1982 ServiceRegistration reg2 = o2.getServiceRegistration();
1983 int typeComparison = reg1.getServiceType().compareTo(reg2.getServiceType());
1984 return typeComparison == 0
1985 ? reg1.getHost().compareTo(reg2.getHost())
1986 : typeComparison;
1987 });
1988
1989 return stats;
1990 };
1991 }
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001 @Override
2002 public List<ServiceRegistration> getServiceRegistrationsByLoad(String serviceType) throws ServiceRegistryException {
2003 SystemLoad loadByHost = getCurrentHostLoads();
2004 List<HostRegistration> hostRegistrations = getHostRegistrations();
2005 List<ServiceRegistration> serviceRegistrations = getServiceRegistrationsByType(serviceType);
2006 return getServiceRegistrationsByLoad(serviceType, serviceRegistrations, hostRegistrations, loadByHost);
2007 }
2008
2009
2010
2011
2012
2013
2014 @Override
2015 public SystemLoad getCurrentHostLoads() {
2016 return db.exec(getHostLoadsQuery());
2017 }
2018
2019
2020
2021
2022
2023
2024 Function<EntityManager, SystemLoad> getHostLoadsQuery() {
2025 return em -> {
2026 final SystemLoad systemLoad = new SystemLoad();
2027
2028
2029 List<Integer> statuses = JOB_STATUSES_INFLUENCING_LOAD_BALANCING.stream()
2030 .map(Enum::ordinal)
2031 .collect(Collectors.toList());
2032 List<Object[]> rows = namedQuery.findAll(
2033 "ServiceRegistration.hostloads",
2034 Object[].class,
2035 Pair.of("statuses", statuses),
2036
2037
2038 Pair.of("workflow_type", TYPE_WORKFLOW)
2039 ).apply(em);
2040
2041
2042 for (Object[] row : rows) {
2043 String host = String.valueOf(row[0]);
2044 Status status = Status.values()[(int) row[1]];
2045 float currentLoad = ((Number) row[2]).floatValue();
2046 float maxLoad = ((Number) row[3]).floatValue();
2047
2048
2049 if (status == null || !JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(status)) {
2050 currentLoad = 0.0f;
2051 }
2052
2053 NodeLoad serviceLoad = new NodeLoad(host, currentLoad, maxLoad);
2054 systemLoad.addNodeLoad(serviceLoad);
2055 }
2056
2057
2058 getHostRegistrationsQuery().apply(em).stream()
2059 .filter(h -> !systemLoad.containsHost(h.getBaseUrl()))
2060 .forEach(h -> systemLoad.addNodeLoad(new NodeLoad(h.getBaseUrl(), 0.0f, h.getMaxLoad())));
2061 return systemLoad;
2062 };
2063 }
2064
2065
2066
2067
2068
2069
2070 @Override
2071 public List<ServiceRegistration> getServiceRegistrationsByType(String serviceType) throws ServiceRegistryException {
2072 return db.exec(namedQuery.findAll(
2073 "ServiceRegistration.getByType",
2074 ServiceRegistration.class,
2075 Pair.of("serviceType", serviceType)
2076 ));
2077 }
2078
2079
2080
2081
2082
2083
2084 @Override
2085 public List<ServiceRegistration> getServiceRegistrationsByHost(String host) throws ServiceRegistryException {
2086 return db.exec(getServiceRegistrationsByHostQuery(host));
2087 }
2088
2089 private Function<EntityManager, List<ServiceRegistration>> getServiceRegistrationsByHostQuery(String host) {
2090 return namedQuery.findAll(
2091 "ServiceRegistration.getByHost",
2092 ServiceRegistration.class,
2093 Pair.of("host", host)
2094 );
2095 }
2096
2097
2098
2099
2100
2101
2102
2103 @Override
2104 public ServiceRegistration getServiceRegistration(String serviceType, String host) {
2105 return db.exec(getServiceRegistrationQuery(serviceType, host))
2106 .orElse(null);
2107 }
2108
2109
2110
2111
2112
2113
2114
2115 @Reference
2116 void setTrustedHttpClient(TrustedHttpClient client) {
2117 this.client = client;
2118 }
2119
2120
2121
2122
2123
2124
2125
2126 @Reference
2127 public void setSecurityService(SecurityService securityService) {
2128 this.securityService = securityService;
2129 }
2130
2131
2132 @Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC, unbind = "unsetIncidentService")
2133 public void setIncidentService(IncidentService incidentService) {
2134 this.incidentService = incidentService;
2135
2136 ((OsgiIncidentService) incidentService).setServiceRegistry(this);
2137 this.incidents = new Incidents(this, incidentService);
2138 }
2139
2140 public void unsetIncidentService(IncidentService incidentService) {
2141 if (this.incidentService == incidentService) {
2142 this.incidentService = null;
2143 this.incidents = null;
2144 }
2145 }
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156 private void updateServiceForFailover(JpaJob job) throws ServiceRegistryException, NotFoundException {
2157 if (job.getStatus() != Status.FAILED && job.getStatus() != Status.FINISHED) {
2158 return;
2159 }
2160
2161 job.setStatus(job.getStatus(), job.getFailureReason());
2162
2163
2164
2165 ServiceRegistrationJpaImpl currentService = job.getProcessorServiceRegistration();
2166 if (currentService == null) {
2167 return;
2168 }
2169
2170
2171 if (job.getStatus() == FAILED && !DATA.equals(job.getFailureReason())) {
2172
2173
2174 List<ServiceRegistrationJpaImpl> relatedWarningOrErrorServices = getRelatedWarningErrorServices(job);
2175
2176
2177 if (relatedWarningOrErrorServices.size() > 0) {
2178 for (ServiceRegistrationJpaImpl relatedService : relatedWarningOrErrorServices) {
2179
2180 if (currentService.equals(relatedService)) {
2181 continue;
2182 }
2183
2184
2185
2186 if (relatedService.getServiceState() == WARNING) {
2187 logger.info("State reset to NORMAL for related service {} on host {}", relatedService.getServiceType(),
2188 relatedService.getHost());
2189 relatedService.setServiceState(NORMAL, job.toJob().getSignature());
2190 }
2191
2192
2193 else if (relatedService.getServiceState() == ERROR) {
2194 logger.info("State reset to WARNING for related service {} on host {}", relatedService.getServiceType(),
2195 relatedService.getHost());
2196 relatedService.setServiceState(WARNING, relatedService.getWarningStateTrigger());
2197 }
2198
2199 updateServiceState(relatedService);
2200 }
2201 }
2202
2203
2204 else {
2205
2206 if (currentService.getServiceState() == NORMAL) {
2207 logger.info("State set to WARNING for current service {} on host {}", currentService.getServiceType(),
2208 currentService.getHost());
2209 currentService.setServiceState(WARNING, job.toJob().getSignature());
2210 updateServiceState(currentService);
2211 }
2212
2213
2214 else if (errorStatesEnabled && !noErrorStateServiceTypes.contains(currentService.getServiceType())
2215 && getHistorySize(currentService) >= maxAttemptsBeforeErrorState) {
2216 logger.info("State set to ERROR for current service {} on host {}", currentService.getServiceType(),
2217 currentService.getHost());
2218 currentService.setServiceState(ERROR, job.toJob().getSignature());
2219 updateServiceState(currentService);
2220 }
2221 }
2222 }
2223
2224
2225 else if (job.getStatus() == Status.FINISHED) {
2226
2227 if (currentService.getServiceState() == WARNING) {
2228 logger.info("State reset to NORMAL for current service {} on host {}", currentService.getServiceType(),
2229 currentService.getHost());
2230 currentService.setServiceState(NORMAL);
2231 updateServiceState(currentService);
2232 }
2233 }
2234 }
2235
2236
2237
2238
2239
2240
2241 @Override
2242 public void sanitize(String serviceType, String host) throws NotFoundException {
2243 db.execChecked(em -> {
2244 ServiceRegistrationJpaImpl service = getServiceRegistrationQuery(serviceType, host).apply(em)
2245 .orElseThrow(NotFoundException::new);
2246
2247 logger.info("State reset to NORMAL for service {} on host {} through sanitize method", service.getServiceType(),
2248 service.getHost());
2249 service.setServiceState(NORMAL);
2250 updateServiceState(service);
2251 });
2252 }
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263 private int getHistorySize(ServiceRegistration serviceRegistration) throws ServiceRegistryException {
2264 if (serviceRegistration == null) {
2265 throw new IllegalArgumentException("serviceRegistration must not be null!");
2266 }
2267
2268 logger.debug("Calculating count of jobs who failed due to service {}", serviceRegistration);
2269
2270 try {
2271 return db.exec(namedQuery.find(
2272 "Job.count.history.failed",
2273 Number.class,
2274 Pair.of("serviceType", serviceRegistration.getServiceType()),
2275 Pair.of("host", serviceRegistration.getHost())
2276 )).intValue();
2277 } catch (Exception e) {
2278 throw new ServiceRegistryException(e);
2279 }
2280 }
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293 private List<ServiceRegistrationJpaImpl> getRelatedWarningErrorServices(JpaJob job) throws ServiceRegistryException {
2294 if (job == null) {
2295 throw new IllegalArgumentException("job must not be null!");
2296 }
2297
2298 logger.debug("Try to get the services in WARNING or ERROR state triggered by {} failed", job);
2299
2300 try {
2301 return db.exec(namedQuery.findAll(
2302 "ServiceRegistration.relatedservices.warning_error",
2303 ServiceRegistrationJpaImpl.class,
2304 Pair.of("serviceType", job.getJobType())
2305 )).stream()
2306
2307 .filter(rs ->
2308 (rs.getServiceState() == WARNING && rs.getWarningStateTrigger() == job.toJob().getSignature())
2309 || (rs.getServiceState() == ERROR && rs.getErrorStateTrigger() == job.toJob().getSignature())
2310 ).collect(Collectors.toList());
2311 } catch (Exception e) {
2312 throw new ServiceRegistryException(e);
2313 }
2314 }
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329 protected List<ServiceRegistration> getServiceRegistrationsWithCapacity(String jobType,
2330 List<ServiceRegistration> serviceRegistrations, List<HostRegistration> hostRegistrations,
2331 final SystemLoad systemLoad) {
2332 final List<String> hostBaseUrls = hostRegistrations.stream()
2333 .map(HostRegistration::getBaseUrl)
2334 .collect(Collectors.toUnmodifiableList());
2335 final List<ServiceRegistration> filteredList = new ArrayList<>();
2336
2337 for (ServiceRegistration service : serviceRegistrations) {
2338
2339 if (!hostBaseUrls.contains(service.getHost())) {
2340 logger.trace("Not considering {} because it's host {} is not available for dispatching", service,
2341 service.getHost());
2342 continue;
2343 }
2344
2345
2346 if (!jobType.equals(service.getServiceType())) {
2347 logger.trace("Not considering {} because it is of the wrong job type", service);
2348 continue;
2349 }
2350
2351
2352 if (service.getServiceState() == ERROR) {
2353 logger.trace("Not considering {} because it is in error state", service);
2354 continue;
2355 }
2356
2357
2358 if (service.isInMaintenanceMode()) {
2359 logger.trace("Not considering {} because it is in maintenance mode", service);
2360 continue;
2361 }
2362
2363
2364 if (!service.isOnline()) {
2365 logger.trace("Not considering {} because it is currently offline", service);
2366 continue;
2367 }
2368
2369
2370 Float hostLoadMax = null;
2371 for (HostRegistration host : hostRegistrations) {
2372 if (host.getBaseUrl().equals(service.getHost())) {
2373 hostLoadMax = host.getMaxLoad();
2374 break;
2375 }
2376 }
2377 if (hostLoadMax == null) {
2378 logger.warn("Unable to determine max load for host {}", service.getHost());
2379 }
2380
2381
2382 Float hostLoad = systemLoad.get(service.getHost()).getLoadFactor();
2383 if (hostLoad == null) {
2384 logger.warn("Unable to determine current load for host {}", service.getHost());
2385 }
2386
2387
2388 if (hostLoad == null || hostLoadMax == null || hostLoad < hostLoadMax) {
2389 logger.debug("Adding candidate service {} for processing of jobs of type '{}' (host load is {} of max {})",
2390 service, jobType, hostLoad, hostLoadMax);
2391 filteredList.add(service);
2392 }
2393 }
2394
2395
2396 filteredList.sort(new LoadComparator(systemLoad));
2397
2398 return filteredList;
2399 }
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414 protected List<ServiceRegistration> getServiceRegistrationsByLoad(String jobType,
2415 List<ServiceRegistration> serviceRegistrations, List<HostRegistration> hostRegistrations,
2416 final SystemLoad systemLoad) {
2417 final List<String> hostBaseUrls = hostRegistrations.stream()
2418 .map(HostRegistration::getBaseUrl)
2419 .collect(Collectors.toUnmodifiableList());
2420 final List<ServiceRegistration> filteredList = new ArrayList<>();
2421
2422 logger.debug("Finding services to dispatch job of type {}", jobType);
2423
2424 for (ServiceRegistration service : serviceRegistrations) {
2425
2426 if (!hostBaseUrls.contains(service.getHost())) {
2427 logger.trace("Not considering {} because it's host {} is not available for dispatching", service,
2428 service.getHost());
2429 continue;
2430 }
2431
2432
2433 if (!jobType.equals(service.getServiceType())) {
2434 logger.trace("Not considering {} because it is of the wrong job type", service);
2435 continue;
2436 }
2437
2438
2439 if (service.getServiceState() == ERROR) {
2440 logger.trace("Not considering {} because it is in error state", service);
2441 continue;
2442 }
2443
2444
2445 if (service.isInMaintenanceMode()) {
2446 logger.trace("Not considering {} because it is in maintenance mode", service);
2447 continue;
2448 }
2449
2450
2451 if (!service.isOnline()) {
2452 logger.trace("Not considering {} because it is currently offline", service);
2453 continue;
2454 }
2455
2456
2457 logger.debug("Adding candidate service {} for processing of job of type '{}'", service, jobType);
2458 filteredList.add(service);
2459 }
2460
2461
2462 if ("org.opencastproject.composer".equals(jobType))
2463 Collections.sort(filteredList, new LoadComparatorEncoding(systemLoad));
2464 else
2465 Collections.sort(filteredList, new LoadComparator(systemLoad));
2466
2467 return filteredList;
2468 }
2469
2470
2471
2472
2473
2474
2475 @Override
2476 public SystemLoad getMaxLoads() throws ServiceRegistryException {
2477 final SystemLoad loads = new SystemLoad();
2478 getHostRegistrations().stream()
2479 .map(host -> new NodeLoad(host.getBaseUrl(), 0.0f, host.getMaxLoad()))
2480 .forEach(loads::addNodeLoad);
2481 return loads;
2482 }
2483
2484
2485
2486
2487
2488
2489 @Override
2490 public NodeLoad getMaxLoadOnNode(String host) throws ServiceRegistryException, NotFoundException {
2491 try {
2492 float maxLoad = db.exec(namedQuery.find(
2493 "HostRegistration.getMaxLoadByHostName",
2494 Number.class,
2495 Pair.of("host", host)
2496 )).floatValue();
2497 return new NodeLoad(host, 0.0f, maxLoad);
2498 } catch (NoResultException e) {
2499 throw new NotFoundException(e);
2500 } catch (Exception e) {
2501 throw new ServiceRegistryException(e);
2502 }
2503 }
2504
2505
2506 class JobProducerHeartbeat implements Runnable {
2507
2508
2509 private final List<ServiceRegistration> unresponsive = new ArrayList<>();
2510
2511
2512
2513
2514
2515
2516 @Override
2517 public void run() {
2518 logger.debug("Checking for unresponsive services");
2519
2520 try {
2521 List<ServiceRegistration> serviceRegistrations = getOnlineServiceRegistrations();
2522
2523 for (ServiceRegistration service : serviceRegistrations) {
2524 hostsStatistics.updateHost(((ServiceRegistrationJpaImpl) service).getHostRegistration());
2525 servicesStatistics.updateService(service);
2526 if (!service.isJobProducer()) {
2527 continue;
2528 }
2529 if (service.isInMaintenanceMode()) {
2530 continue;
2531 }
2532
2533
2534 String serviceUrl = UrlSupport.concat(service.getHost(), service.getPath(), "dispatch");
2535
2536 HttpHead options = new HttpHead(serviceUrl);
2537 HttpResponse response = null;
2538 try {
2539 try {
2540 response = client.execute(options);
2541 if (response != null) {
2542 switch (response.getStatusLine().getStatusCode()) {
2543 case HttpStatus.SC_OK:
2544
2545 logger.trace("Service " + service + " is responsive: " + response.getStatusLine());
2546 if (unresponsive.remove(service)) {
2547 logger.info("Service {} is still online", service);
2548 } else if (!service.isOnline()) {
2549 try {
2550 setOnlineStatus(service.getServiceType(), service.getHost(), service.getPath(), true, true);
2551 logger.info("Service {} is back online", service);
2552 } catch (ServiceRegistryException e) {
2553 logger.warn("Error setting online status for {}", service);
2554 }
2555 }
2556 continue;
2557 default:
2558 if (!service.isOnline()) {
2559 continue;
2560 }
2561 logger.warn("Service {} is not working as expected: {}", service, response.getStatusLine());
2562 }
2563 } else {
2564 logger.warn("Service {} does not respond", service);
2565 }
2566 } catch (TrustedHttpClientException e) {
2567 if (!service.isOnline()) {
2568 continue;
2569 }
2570 logger.warn("Unable to reach {}", service, e);
2571 }
2572
2573
2574 try {
2575 if (unresponsive.contains(service)) {
2576 unRegisterService(service.getServiceType(), service.getHost());
2577 unresponsive.remove(service);
2578 logger.warn("Marking {} as offline", service);
2579 } else {
2580 unresponsive.add(service);
2581 logger.warn("Added {} to the watch list", service);
2582 }
2583 } catch (ServiceRegistryException e) {
2584 logger.warn("Unable to unregister unreachable service: {}", service, e);
2585 }
2586 } finally {
2587 client.close(response);
2588 }
2589 }
2590 } catch (Throwable t) {
2591 logger.warn("Error while checking for unresponsive services", t);
2592 }
2593
2594 logger.debug("Finished checking for unresponsive services");
2595 }
2596 }
2597
2598
2599
2600
2601
2602 private class LoadComparator implements Comparator<ServiceRegistration> {
2603
2604 protected SystemLoad loadByHost = null;
2605
2606
2607
2608
2609
2610
2611
2612 LoadComparator(SystemLoad loadByHost) {
2613 this.loadByHost = loadByHost;
2614 }
2615
2616 @Override
2617 public int compare(ServiceRegistration serviceA, ServiceRegistration serviceB) {
2618 String hostA = serviceA.getHost();
2619 String hostB = serviceB.getHost();
2620 NodeLoad nodeA = loadByHost.get(hostA);
2621 NodeLoad nodeB = loadByHost.get(hostB);
2622
2623 if (Math.abs(nodeA.getLoadFactor() - nodeB.getLoadFactor()) <= 0.01) {
2624
2625
2626
2627 return Float.compare(nodeB.getMaxLoad(), nodeA.getMaxLoad());
2628 }
2629 return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2630 }
2631 }
2632
2633
2634
2635
2636
2637
2638 private class LoadComparatorEncoding extends LoadComparator implements Comparator<ServiceRegistration> {
2639
2640
2641
2642
2643
2644
2645 LoadComparatorEncoding(SystemLoad loadByHost) {
2646 super(loadByHost);
2647 }
2648
2649 @Override
2650 public int compare(ServiceRegistration serviceA, ServiceRegistration serviceB) {
2651 String hostA = serviceA.getHost();
2652 String hostB = serviceB.getHost();
2653 NodeLoad nodeA = loadByHost.get(hostA);
2654 NodeLoad nodeB = loadByHost.get(hostB);
2655
2656 if (encodingWorkers != null) {
2657 if (encodingWorkers.contains(hostA) && !encodingWorkers.contains(hostB)) {
2658 if (nodeA.getLoadFactor() <= encodingThreshold) {
2659 return -1;
2660 }
2661 return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2662 }
2663 if (encodingWorkers.contains(hostB) && !encodingWorkers.contains(hostA)) {
2664 if (nodeB.getLoadFactor() <= encodingThreshold) {
2665 return 1;
2666 }
2667 return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2668 }
2669 }
2670 return super.compare(serviceA, serviceB);
2671 }
2672 }
2673 }