1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.serviceregistry.impl;
23
24 import static java.lang.String.format;
25 import static org.apache.commons.lang3.StringUtils.isBlank;
26 import static org.opencastproject.db.Queries.namedQuery;
27 import static org.opencastproject.job.api.AbstractJobProducer.ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY;
28 import static org.opencastproject.job.api.AbstractJobProducer.DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING;
29 import static org.opencastproject.job.api.Job.FailureReason.DATA;
30 import static org.opencastproject.job.api.Job.Status.FAILED;
31 import static org.opencastproject.serviceregistry.api.ServiceState.ERROR;
32 import static org.opencastproject.serviceregistry.api.ServiceState.NORMAL;
33 import static org.opencastproject.serviceregistry.api.ServiceState.WARNING;
34 import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
35
36 import org.opencastproject.db.DBSession;
37 import org.opencastproject.db.DBSessionFactory;
38 import org.opencastproject.job.api.Job;
39 import org.opencastproject.job.api.Job.Status;
40 import org.opencastproject.job.jpa.JpaJob;
41 import org.opencastproject.security.api.Organization;
42 import org.opencastproject.security.api.SecurityService;
43 import org.opencastproject.security.api.TrustedHttpClient;
44 import org.opencastproject.security.api.TrustedHttpClientException;
45 import org.opencastproject.security.api.User;
46 import org.opencastproject.serviceregistry.api.HostRegistration;
47 import org.opencastproject.serviceregistry.api.HostStatistics;
48 import org.opencastproject.serviceregistry.api.IncidentService;
49 import org.opencastproject.serviceregistry.api.Incidents;
50 import org.opencastproject.serviceregistry.api.JaxbServiceStatistics;
51 import org.opencastproject.serviceregistry.api.ServiceRegistration;
52 import org.opencastproject.serviceregistry.api.ServiceRegistry;
53 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
54 import org.opencastproject.serviceregistry.api.ServiceStatistics;
55 import org.opencastproject.serviceregistry.api.SystemLoad;
56 import org.opencastproject.serviceregistry.api.SystemLoad.NodeLoad;
57 import org.opencastproject.serviceregistry.impl.jpa.HostRegistrationJpaImpl;
58 import org.opencastproject.serviceregistry.impl.jpa.ServiceRegistrationJpaImpl;
59 import org.opencastproject.systems.OpencastConstants;
60 import org.opencastproject.util.NotFoundException;
61 import org.opencastproject.util.UrlSupport;
62 import org.opencastproject.util.function.ThrowingConsumer;
63
64 import org.apache.commons.lang3.StringUtils;
65 import org.apache.commons.lang3.time.DateUtils;
66 import org.apache.commons.lang3.tuple.Pair;
67 import org.apache.http.HttpResponse;
68 import org.apache.http.HttpStatus;
69 import org.apache.http.client.methods.HttpHead;
70 import org.osgi.service.cm.ConfigurationException;
71 import org.osgi.service.cm.ManagedService;
72 import org.osgi.service.component.ComponentContext;
73 import org.osgi.service.component.annotations.Activate;
74 import org.osgi.service.component.annotations.Component;
75 import org.osgi.service.component.annotations.Deactivate;
76 import org.osgi.service.component.annotations.Modified;
77 import org.osgi.service.component.annotations.Reference;
78 import org.osgi.service.component.annotations.ReferenceCardinality;
79 import org.osgi.service.component.annotations.ReferencePolicy;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
82
83 import java.net.InetAddress;
84 import java.net.URI;
85 import java.net.URISyntaxException;
86 import java.util.ArrayList;
87 import java.util.Arrays;
88 import java.util.Collections;
89 import java.util.Comparator;
90 import java.util.Date;
91 import java.util.Dictionary;
92 import java.util.HashMap;
93 import java.util.List;
94 import java.util.Map;
95 import java.util.Objects;
96 import java.util.Optional;
97 import java.util.concurrent.Executors;
98 import java.util.concurrent.ScheduledFuture;
99 import java.util.concurrent.ScheduledThreadPoolExecutor;
100 import java.util.concurrent.TimeUnit;
101 import java.util.concurrent.atomic.AtomicReference;
102 import java.util.function.Consumer;
103 import java.util.function.Function;
104 import java.util.stream.Collectors;
105
106 import javax.persistence.EntityManager;
107 import javax.persistence.EntityManagerFactory;
108 import javax.persistence.LockModeType;
109 import javax.persistence.NoResultException;
110 import javax.persistence.TypedQuery;
111
112
113 @Component(
114 property = {
115 "service.description=Service registry"
116 },
117 immediate = true,
118 service = { ManagedService.class, ServiceRegistry.class, ServiceRegistryJpaImpl.class }
119 )
120 public class ServiceRegistryJpaImpl implements ServiceRegistry, ManagedService {
121
122
123 public static final String PERSISTENCE_UNIT = "org.opencastproject.common";
124
125
126
127 public static final String START_OPERATION = "START_OPERATION";
128
129
130
131 public static final String START_WORKFLOW = "START_WORKFLOW";
132
133
134 public static final String RESUME = "RESUME";
135
136
137 public static final String TYPE_WORKFLOW = "org.opencastproject.workflow";
138
139 static final Logger logger = LoggerFactory.getLogger(ServiceRegistryJpaImpl.class);
140
141
142 private static final ThreadLocal<Job> currentJob = new ThreadLocal<>();
143
144
145 protected static final String OPT_MAXLOAD = "org.opencastproject.server.maxload";
146
147
148 protected static final String OPT_JOBSTATISTICS = "jobstats.collect";
149
150
151
152 protected static final String OPT_SERVICE_STATISTICS_MAX_JOB_AGE =
153 "org.opencastproject.statistics.services.max_job_age";
154
155
156 protected static final String OPT_ENCODING_WORKERS = "org.opencastproject.encoding.workers";
157
158
159 protected static final String OPT_ENCODING_THRESHOLD = "org.opencastproject.encoding.workers.threshold";
160
161
162 protected TrustedHttpClient client = null;
163
164
165
166 static final int DEFAULT_DISPATCH_JOBS_LIMIT = 100;
167
168
169 static final boolean DEFAULT_JOB_STATISTICS = false;
170
171
172 static final int DEFAULT_SERVICE_STATISTICS_MAX_JOB_AGE = 14;
173
174 static final List<String> DEFAULT_ENCODING_WORKERS = new ArrayList<String>();
175
176 static final double DEFAULT_ENCODING_THRESHOLD = 0.0;
177
178
179 static final String MAX_ATTEMPTS_CONFIG_KEY = "max.attempts";
180
181
182 static final String NO_ERROR_STATE_SERVICE_TYPES_CONFIG_KEY = "no.error.state.service.types";
183
184
185 private static final int DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE = -1;
186
187
188 private static final boolean DEFAULT_ERROR_STATES_ENABLED = true;
189
190
191 protected int maxAttemptsBeforeErrorState = DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE;
192 private boolean errorStatesEnabled = DEFAULT_ERROR_STATES_ENABLED;
193
194
195 private List<String> noErrorStateServiceTypes = new ArrayList<>();
196
197
198 static final float DEFAULT_JOB_LOAD = 0.1f;
199
200
201 protected String hostName;
202
203
204 protected String nodeName;
205
206
207 protected String jobHost;
208
209
210 protected static List<String> encodingWorkers = DEFAULT_ENCODING_WORKERS;
211
212
213 protected static double encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
214
215
216 protected EntityManagerFactory emf = null;
217
218 protected DBSessionFactory dbSessionFactory;
219
220 protected DBSession db;
221
222
223 protected ScheduledThreadPoolExecutor scheduledExecutor = null;
224 private ScheduledFuture hbfuture = null;
225
226
227 protected SecurityService securityService = null;
228
229 protected IncidentService incidentService = null;
230
231 protected Incidents incidents;
232
233
234 protected boolean collectJobstats = DEFAULT_JOB_STATISTICS;
235
236
237 protected int maxJobAge = DEFAULT_SERVICE_STATISTICS_MAX_JOB_AGE;
238
239
240 protected static final List<Status> JOB_STATUSES_INFLUENCING_LOAD_BALANCING;
241
242 private static final Status[] activeJobStatus =
243 Arrays.stream(Status.values()).filter(Status::isActive).collect(Collectors.toList()).toArray(new Status[0]);
244
245 protected static HashMap<Long, Float> jobCache = new HashMap<>();
246
247 static {
248 JOB_STATUSES_INFLUENCING_LOAD_BALANCING = new ArrayList<>();
249 JOB_STATUSES_INFLUENCING_LOAD_BALANCING.add(Status.RUNNING);
250 }
251
252
253 protected Boolean acceptJobLoadsExeedingMaxLoad = true;
254
255
256 protected float localSystemLoad = 0.0f;
257
258
259 @Reference(target = "(osgi.unit.name=org.opencastproject.common)")
260 void setEntityManagerFactory(EntityManagerFactory emf) {
261 this.emf = emf;
262 }
263
264 @Reference
265 public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
266 this.dbSessionFactory = dbSessionFactory;
267 }
268
269 @Activate
270 public void activate(ComponentContext cc) {
271 logger.info("Activate service registry");
272
273 db = dbSessionFactory.createSession(emf);
274
275
276 if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty(OpencastConstants.SERVER_URL_PROPERTY))) {
277 hostName = UrlSupport.DEFAULT_BASE_URL;
278 } else {
279 hostName = cc.getBundleContext().getProperty(OpencastConstants.SERVER_URL_PROPERTY);
280 }
281
282
283 if (hostName.endsWith("/")) {
284 logger.warn("The configured value of {} ends with '/'. This is very likely a configuration error which could "
285 + "lead to services not working properly. Note that this configuration should not contain any part of "
286 + "the service paths.", OpencastConstants.SERVER_URL_PROPERTY);
287 }
288
289
290 cleanUndispatchableJobs(hostName);
291
292
293 if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty("org.opencastproject.jobs.url"))) {
294 jobHost = hostName;
295 } else {
296 jobHost = cc.getBundleContext().getProperty("org.opencastproject.jobs.url");
297 }
298
299
300 try {
301 if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty(OpencastConstants.NODE_NAME_PROPERTY))) {
302 nodeName = null;
303 } else {
304 nodeName = cc.getBundleContext().getProperty(OpencastConstants.NODE_NAME_PROPERTY);
305 }
306
307 float maxLoad = Runtime.getRuntime().availableProcessors();
308 if (cc != null && StringUtils.isNotBlank(cc.getBundleContext().getProperty(OPT_MAXLOAD))) {
309 try {
310 maxLoad = Float.parseFloat(cc.getBundleContext().getProperty(OPT_MAXLOAD));
311 logger.info("Max load has been set manually to {}", maxLoad);
312 } catch (NumberFormatException e) {
313 logger.warn("Configuration key '{}' is not an integer. Falling back to the number of cores ({})",
314 OPT_MAXLOAD, maxLoad);
315 }
316 }
317
318 logger.info("Node maximum load set to {}", maxLoad);
319
320 String address = InetAddress.getByName(URI.create(hostName).getHost()).getHostAddress();
321 long maxMemory = Runtime.getRuntime().maxMemory();
322 int cores = Runtime.getRuntime().availableProcessors();
323
324 registerHost(hostName, address, nodeName, maxMemory, cores, maxLoad);
325 } catch (Exception e) {
326 throw new IllegalStateException("Unable to register host " + hostName + " in the service registry", e);
327 }
328
329
330 if (cc != null) {
331 acceptJobLoadsExeedingMaxLoad = getOptContextProperty(cc, ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY)
332 .map(Boolean::valueOf)
333 .orElse(DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING);
334 }
335
336 localSystemLoad = 0;
337 logger.info("Activated");
338 }
339
340 @Override
341 public float getOwnLoad() {
342 return localSystemLoad;
343 }
344
345 @Override
346 public String getRegistryHostname() {
347 return hostName;
348 }
349
350 @Deactivate
351 public void deactivate() {
352 logger.info("deactivate service registry");
353
354
355 if (scheduledExecutor != null) {
356 try {
357 scheduledExecutor.shutdownNow();
358 if (!scheduledExecutor.isShutdown()) {
359 logger.info("Waiting for Dispatcher to terminate");
360 scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS);
361 }
362 } catch (InterruptedException e) {
363 logger.error("Error shutting down the Dispatcher", e);
364 }
365 }
366
367 try {
368 unregisterHost(hostName);
369 } catch (ServiceRegistryException e) {
370 throw new IllegalStateException("Unable to unregister host " + hostName + " from the service registry", e);
371 }
372 }
373
374
375
376
377
378
379 @Override
380 public Job createJob(String type, String operation) throws ServiceRegistryException {
381 return createJob(this.hostName, type, operation, null, null, true, getCurrentJob(), DEFAULT_JOB_LOAD);
382 }
383
384
385
386
387
388
389
390 @Override
391 public Job createJob(String type, String operation, List<String> arguments) throws ServiceRegistryException {
392 return createJob(this.hostName, type, operation, arguments, null, true, getCurrentJob(), DEFAULT_JOB_LOAD);
393 }
394
395
396
397
398
399
400
401 @Override
402 public Job createJob(String type, String operation, List<String> arguments, Float jobLoad)
403 throws ServiceRegistryException {
404 return createJob(this.hostName, type, operation, arguments, null, true, getCurrentJob(), jobLoad);
405 }
406
407
408
409
410
411
412
413 @Override
414 public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable)
415 throws ServiceRegistryException {
416 return createJob(this.hostName, type, operation, arguments, payload, dispatchable, getCurrentJob(),
417 DEFAULT_JOB_LOAD);
418 }
419
420
421
422
423
424
425
426 @Override
427 public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
428 Float jobLoad) throws ServiceRegistryException {
429 return createJob(this.hostName, type, operation, arguments, payload, dispatchable, getCurrentJob(), jobLoad);
430 }
431
432 public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
433 Job parentJob) throws ServiceRegistryException {
434 return createJob(this.hostName, type, operation, arguments, payload, dispatchable, parentJob, DEFAULT_JOB_LOAD);
435 }
436
437
438
439
440
441
442
443 @Override
444 public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
445 Job parentJob, Float jobLoad) throws ServiceRegistryException {
446 return createJob(this.hostName, type, operation, arguments, payload, dispatchable, parentJob, jobLoad);
447 }
448
449
450
451
452 public Job createJob(String host, String serviceType, String operation, List<String> arguments, String payload,
453 boolean dispatchable, Job parentJob) throws ServiceRegistryException {
454 return createJob(host, serviceType, operation, arguments, payload, dispatchable, parentJob, DEFAULT_JOB_LOAD);
455 }
456
457
458
459
460 public Job createJob(String host, String serviceType, String operation, List<String> arguments, String payload,
461 boolean dispatchable, Job parentJob, float jobLoad) throws ServiceRegistryException {
462 if (StringUtils.isBlank(host)) {
463 throw new IllegalArgumentException("Host can't be null");
464 }
465 if (StringUtils.isBlank(serviceType)) {
466 throw new IllegalArgumentException("Service type can't be null");
467 }
468 if (StringUtils.isBlank(operation)) {
469 throw new IllegalArgumentException("Operation can't be null");
470 }
471
472 JpaJob jpaJob = db.execTxChecked(em -> {
473 ServiceRegistrationJpaImpl creatingService = getServiceRegistrationQuery(serviceType, host).apply(em)
474 .orElseThrow(() -> new ServiceRegistryException("No service registration exists for type '" + serviceType
475 + "' on host '" + host + "'"));
476
477 if (creatingService.getHostRegistration().isMaintenanceMode()) {
478 logger.warn("Creating a job from {}, which is currently in maintenance mode.", creatingService.getHost());
479 } else if (!creatingService.getHostRegistration().isActive()) {
480 logger.warn("Creating a job from {}, which is currently inactive.", creatingService.getHost());
481 }
482
483 User currentUser = securityService.getUser();
484 Organization currentOrganization = securityService.getOrganization();
485
486 JpaJob job = new JpaJob(currentUser, currentOrganization, creatingService, operation, arguments, payload,
487 dispatchable, jobLoad);
488
489
490 if (parentJob != null) {
491
492 JpaJob jpaParentJob = getJpaJobQuery(parentJob.getId()).apply(em).orElseThrow(() -> {
493 logger.error("job with id {} not found in the persistence context", parentJob);
494
495 removeFromLoadCache(parentJob.getId());
496 return new ServiceRegistryException(new NotFoundException());
497 });
498 job.setParentJob(jpaParentJob);
499
500
501 JpaJob jpaRootJob = jpaParentJob;
502 if (parentJob.getRootJobId() != null) {
503 jpaRootJob = getJpaJobQuery(parentJob.getRootJobId()).apply(em).orElseThrow(() -> {
504 logger.error("job with id {} not found in the persistence context", parentJob.getRootJobId());
505
506 removeFromLoadCache(parentJob.getRootJobId());
507 return new ServiceRegistryException(new NotFoundException());
508 });
509 }
510 job.setRootJob(jpaRootJob);
511 }
512
513
514 if (dispatchable) {
515 logger.trace("Queuing dispatchable '{}'", job);
516 job.setStatus(Status.QUEUED);
517 } else {
518 logger.trace("Giving new non-dispatchable '{}' its creating service as processor '{}'", job, creatingService);
519 job.setProcessorServiceRegistration(creatingService);
520 }
521
522 em.persist(job);
523 return job;
524 });
525
526 setJobUri(jpaJob);
527 return jpaJob.toJob();
528 }
529
530 @Override
531 public void removeJobs(List<Long> jobIds) throws NotFoundException, ServiceRegistryException {
532 for (long jobId: jobIds) {
533 if (jobId < 1) {
534 throw new NotFoundException("Job ID must be greater than zero (0)");
535 }
536 }
537
538 logger.debug("Start deleting jobs with IDs '{}'", jobIds);
539 try {
540 db.execTxChecked(em -> {
541 for (long jobId : jobIds) {
542 JpaJob job = em.find(JpaJob.class, jobId);
543 if (job == null) {
544 logger.error("Job with Id {} cannot be deleted: Not found.", jobId);
545 removeFromLoadCache(jobId);
546 throw new NotFoundException("Job with ID '" + jobId + "' not found");
547 }
548 deleteChildJobsQuery(jobId).accept(em);
549 em.remove(job);
550 removeFromLoadCache(jobId);
551 }
552 });
553 } catch (NotFoundException | ServiceRegistryException e) {
554 throw e;
555 } catch (Exception e) {
556 throw new ServiceRegistryException(e);
557 }
558
559 logger.info("Jobs with IDs '{}' deleted", jobIds);
560 }
561
562 private ThrowingConsumer<EntityManager, Exception> deleteChildJobsQuery(long jobId) {
563 return em -> {
564 List<Job> childJobs = getChildJobs(jobId);
565 if (childJobs.isEmpty()) {
566 logger.trace("No child jobs of job '{}' found to delete.", jobId);
567 return;
568 }
569
570 logger.debug("Start deleting child jobs of job '{}'", jobId);
571
572 try {
573 for (int i = childJobs.size() - 1; i >= 0; i--) {
574 Job job = childJobs.get(i);
575 JpaJob jobToDelete = em.find(JpaJob.class, job.getId());
576 em.remove(jobToDelete);
577 removeFromLoadCache(job.getId());
578 logger.debug("{} deleted", job);
579 }
580 logger.debug("Deleted all child jobs of job '{}'", jobId);
581 } catch (Exception e) {
582 throw new ServiceRegistryException("Unable to remove child jobs from " + jobId, e);
583 }
584 };
585 }
586
587 @Override
588 public void removeParentlessJobs(int lifetime) throws ServiceRegistryException {
589 int count = db.execTxChecked(em -> {
590 int c = 0;
591
592 List<Job> jobs = namedQuery.findAll("Job.withoutParent", JpaJob.class).apply(em).stream()
593 .map(JpaJob::toJob)
594 .filter(j -> j.getDateCreated().before(DateUtils.addDays(new Date(), -lifetime)))
595
596 .filter(j -> !START_OPERATION.equals(j.getOperation())
597 && !START_WORKFLOW.equals(j.getOperation())
598 && !RESUME.equals(j.getOperation()))
599 .filter(j -> j.getStatus().isTerminated())
600 .collect(Collectors.toList());
601
602 for (Job job : jobs) {
603 try {
604 removeJobs(Collections.singletonList(job.getId()));
605 logger.debug("Parentless '{}' removed", job);
606 c++;
607 } catch (NotFoundException e) {
608 logger.debug("Parentless '{} ' not found in database", job, e);
609 }
610 }
611
612 return c;
613 });
614
615
616 if (count > 0) {
617 logger.info("Successfully removed {} parentless jobs", count);
618 } else {
619 logger.trace("No parentless jobs found to remove");
620 }
621 }
622
623
624
625
626
627
628 @Override
629 public void updated(Dictionary properties) throws ConfigurationException {
630 logger.info("Updating service registry properties");
631
632 maxAttemptsBeforeErrorState = DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE;
633 errorStatesEnabled = DEFAULT_ERROR_STATES_ENABLED;
634 String maxAttempts = StringUtils.trimToNull((String) properties.get(MAX_ATTEMPTS_CONFIG_KEY));
635 if (maxAttempts != null) {
636 try {
637 maxAttemptsBeforeErrorState = Integer.parseInt(maxAttempts);
638 if (maxAttemptsBeforeErrorState < 0) {
639 errorStatesEnabled = false;
640 logger.info("Error states of services disabled");
641 } else {
642 errorStatesEnabled = true;
643 logger.info("Set max attempts before error state to {}", maxAttempts);
644 }
645 } catch (NumberFormatException e) {
646 logger.warn("Can not set max attempts before error state to {}. {} must be an integer", maxAttempts,
647 MAX_ATTEMPTS_CONFIG_KEY);
648 }
649 }
650
651 noErrorStateServiceTypes = new ArrayList<>();
652 String noErrorStateServiceTypesStr = StringUtils.trimToNull((String) properties.get(
653 NO_ERROR_STATE_SERVICE_TYPES_CONFIG_KEY));
654 if (noErrorStateServiceTypesStr != null) {
655 noErrorStateServiceTypes = Arrays.asList(noErrorStateServiceTypesStr.split("\\s*,\\s*"));
656 if (!noErrorStateServiceTypes.isEmpty()) {
657 logger.info("Set service types without error state to {}", String.join(", ", noErrorStateServiceTypes));
658 }
659 }
660
661 String jobStatsString = StringUtils.trimToNull((String) properties.get(OPT_JOBSTATISTICS));
662 if (StringUtils.isNotBlank(jobStatsString)) {
663 try {
664 collectJobstats = Boolean.parseBoolean(jobStatsString);
665 } catch (Exception e) {
666 logger.warn("Job statistics collection flag '{}' is malformed, setting to {}", jobStatsString,
667 DEFAULT_JOB_STATISTICS);
668 collectJobstats = DEFAULT_JOB_STATISTICS;
669 }
670 }
671
672
673 String encodingWorkersString = (String) properties.get(OPT_ENCODING_WORKERS);
674 if (StringUtils.isNotBlank(encodingWorkersString)) {
675 encodingWorkers = Arrays.asList(encodingWorkersString.split("\\s*,\\s*"));
676 } else {
677 encodingWorkers = DEFAULT_ENCODING_WORKERS;
678 }
679
680
681 String encodingThersholdString = StringUtils.trimToNull((String) properties.get(OPT_ENCODING_THRESHOLD));
682 if (StringUtils.isNotBlank(encodingThersholdString) && encodingThersholdString != null) {
683 try {
684 double encodingThresholdTmp = Double.parseDouble(encodingThersholdString);
685 if (encodingThresholdTmp >= 0 && encodingThresholdTmp <= 1) {
686 encodingThreshold = encodingThresholdTmp;
687 } else {
688 encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
689 logger.warn("org.opencastproject.encoding.workers.threshold is not between 0 and 1");
690 }
691 } catch (NumberFormatException e) {
692 logger.warn("Can not set encoding threshold to {}. {} must be an parsable double", encodingThersholdString,
693 OPT_ENCODING_THRESHOLD);
694 }
695 } else {
696 encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
697 }
698
699 String maxJobAgeString = StringUtils.trimToNull((String) properties.get(OPT_SERVICE_STATISTICS_MAX_JOB_AGE));
700 if (maxJobAgeString != null) {
701 try {
702 maxJobAge = Integer.parseInt(maxJobAgeString);
703 logger.info("Set service statistics max job age to {}", maxJobAgeString);
704 } catch (NumberFormatException e) {
705 logger.warn("Can not set service statistics max job age to {}. {} must be an integer", maxJobAgeString,
706 OPT_SERVICE_STATISTICS_MAX_JOB_AGE);
707 }
708 }
709 }
710
711 private void setupScheduledExecutor() {
712 if (scheduledExecutor == null) {
713 scheduledExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
714 scheduledExecutor.setRemoveOnCancelPolicy(true);
715 }
716 }
717
718 protected void startHeartbeat(long heartbeatInterval) {
719 setupScheduledExecutor();
720
721
722 if (heartbeatInterval > 0) {
723
724 if (hbfuture != null) {
725 hbfuture.cancel(true);
726 }
727
728 logger.debug("Starting service heartbeat at a custom interval of {}s", heartbeatInterval);
729 hbfuture = scheduledExecutor.scheduleWithFixedDelay(new JobProducerHeartbeat(), heartbeatInterval,
730 heartbeatInterval, TimeUnit.SECONDS);
731 }
732 }
733
734
735
736
737
738
739 @Modified
740 public void modified(Map<String, Object> config) throws ConfigurationException {
741 logger.debug("Modified serviceregistry");
742 }
743
744 private Function<EntityManager, Optional<JpaJob>> getJpaJobQuery(long id) {
745 return em -> namedQuery.findByIdOpt(JpaJob.class, id)
746 .apply(em)
747 .map(jpaJob -> {
748
749
750 em.refresh(jpaJob);
751 setJobUri(jpaJob);
752 return jpaJob;
753 });
754 }
755
756 @Override
757 public Job getJob(long id) throws NotFoundException, ServiceRegistryException {
758 try {
759 return db.exec(getJpaJobQuery(id))
760 .map(JpaJob::toJob)
761 .orElseThrow(NotFoundException::new);
762 } catch (NotFoundException e) {
763 throw e;
764 } catch (Exception e) {
765 throw new ServiceRegistryException(e);
766 }
767 }
768
769
770
771
772
773
774 @Override
775 public Job getCurrentJob() {
776 return currentJob.get();
777 }
778
779
780
781
782
783
784 @Override
785 public void setCurrentJob(Job job) {
786 currentJob.set(job);
787 }
788
789 JpaJob updateJob(JpaJob job) throws ServiceRegistryException {
790 try {
791
792
793
794 return db.execChecked(em -> {
795 Job oldJob = getJob(job.getId());
796 JpaJob jpaJob = updateInternal(job);
797 if (!TYPE_WORKFLOW.equals(job.getJobType()) && job.getJobLoad() > 0.0f
798 && job.getProcessorServiceRegistration() != null
799 && job.getProcessorServiceRegistration().getHost().equals(getRegistryHostname())) {
800 processCachedLoadChange(job);
801 }
802
803
804 if (oldJob.getStatus() != job.getStatus() && !TYPE_WORKFLOW.equals(job.getJobType())) {
805 updateServiceForFailover(job);
806 }
807
808 return jpaJob;
809 });
810 } catch (ServiceRegistryException e) {
811 throw e;
812 } catch (NotFoundException e) {
813
814 removeFromLoadCache(job.getId());
815 throw new ServiceRegistryException(e);
816 } catch (Exception e) {
817 throw new ServiceRegistryException(e);
818 }
819 }
820
821 @Override
822 public Job updateJob(Job job) throws ServiceRegistryException {
823 JpaJob jpaJob = JpaJob.from(job);
824 jpaJob.setProcessorServiceRegistration(
825 (ServiceRegistrationJpaImpl) getServiceRegistration(job.getJobType(), job.getProcessingHost()));
826 return updateJob(jpaJob).toJob();
827 }
828
829
830
831
832
833
834
835 private synchronized void processCachedLoadChange(JpaJob job) {
836 if (JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(job.getStatus()) && jobCache.get(job.getId()) == null) {
837 logger.debug("Adding to load cache: {}, type {}, load {}, status {}",
838 job, job.getJobType(), job.getJobLoad(), job.getStatus());
839 localSystemLoad += job.getJobLoad();
840 jobCache.put(job.getId(), job.getJobLoad());
841 } else if (jobCache.get(job.getId()) != null && Status.FINISHED.equals(job.getStatus())
842 || Status.FAILED.equals(job.getStatus()) || Status.WAITING.equals(job.getStatus())) {
843 logger.debug("Removing from load cache: {}, type {}, load {}, status {}",
844 job, job.getJobType(), job.getJobLoad(), job.getStatus());
845 localSystemLoad -= job.getJobLoad();
846 jobCache.remove(job.getId());
847 } else {
848 logger.debug("Ignoring for load cache: {}, type {}, status {}",
849 job, job.getJobType(), job.getStatus());
850 }
851 logger.debug("Current host load: {}, job load cache size: {}", format("%.1f", localSystemLoad), jobCache.size());
852
853 if (jobCache.isEmpty()) {
854 if (Math.abs(localSystemLoad) > 0.0000001F) {
855 logger.warn("No jobs in the job load cache, but load is {}: setting job load to 0", localSystemLoad);
856 }
857 localSystemLoad = 0.0F;
858 }
859 }
860
861 private synchronized void removeFromLoadCache(Long jobId) {
862 if (jobCache.get(jobId) != null) {
863 float jobLoad = jobCache.get(jobId);
864 logger.debug("Removing deleted job from load cache: Job {}, load {}", jobId, jobLoad);
865 localSystemLoad -= jobLoad;
866 jobCache.remove(jobId);
867 }
868 }
869
870 protected JpaJob setJobUri(JpaJob job) {
871 try {
872 job.setUri(new URI(jobHost + "/services/job/" + job.getId() + ".xml"));
873 } catch (URISyntaxException e) {
874 logger.warn("Can not set the job URI", e);
875 }
876 return job;
877 }
878
879
880
881
882
883
884
885
886 protected JpaJob updateInternal(JpaJob job) throws NotFoundException {
887 JpaJob fromDb = db.execTxChecked(em -> {
888 JpaJob j = em.find(JpaJob.class, job.getId());
889 if (j == null) {
890 throw new NotFoundException();
891 }
892
893 update(j, job);
894 em.merge(j);
895 return j;
896 });
897
898 job.setVersion(fromDb.toJob().getVersion());
899 setJobUri(job);
900 return job;
901 }
902
903
904
905
906
907
908
909
910 private ServiceRegistration updateServiceState(ServiceRegistrationJpaImpl registration) throws NotFoundException {
911 db.execTxChecked(em -> {
912 ServiceRegistrationJpaImpl fromDb = em.find(ServiceRegistrationJpaImpl.class, registration.getId());
913 if (fromDb == null) {
914 throw new NotFoundException();
915 }
916 fromDb.setServiceState(registration.getServiceState());
917 fromDb.setStateChanged(registration.getStateChanged());
918 fromDb.setWarningStateTrigger(registration.getWarningStateTrigger());
919 fromDb.setErrorStateTrigger(registration.getErrorStateTrigger());
920 });
921
922 return registration;
923 }
924
925
926
927
928
929
930
931
932
933
934 private void update(JpaJob fromDb, JpaJob jpaJob) {
935 final Job job = jpaJob.toJob();
936 final Date now = new Date();
937 final Status status = job.getStatus();
938 final Status fromDbStatus = fromDb.getStatus();
939
940 fromDb.setPayload(job.getPayload());
941 fromDb.setStatus(job.getStatus());
942 fromDb.setDispatchable(job.isDispatchable());
943 fromDb.setVersion(job.getVersion());
944 fromDb.setOperation(job.getOperation());
945 fromDb.setArguments(job.getArguments());
946
947 if (job.getDateCreated() == null) {
948 jpaJob.setDateCreated(now);
949 fromDb.setDateCreated(now);
950 job.setDateCreated(now);
951 }
952 if (job.getProcessingHost() != null) {
953 ServiceRegistrationJpaImpl processingService = (ServiceRegistrationJpaImpl) getServiceRegistration(
954 job.getJobType(), job.getProcessingHost());
955 logger.debug("{} has host '{}': setting processor service to '{}'", job, job.getProcessingHost(),
956 processingService);
957 fromDb.setProcessorServiceRegistration(processingService);
958 } else {
959 logger.debug("Unsetting previous processor service registration for {}", job);
960 fromDb.setProcessorServiceRegistration(null);
961 }
962 if (Status.RUNNING.equals(status) && !Status.WAITING.equals(fromDbStatus)) {
963 if (job.getDateStarted() == null) {
964 jpaJob.setDateStarted(now);
965 jpaJob.setQueueTime(now.getTime() - job.getDateCreated().getTime());
966 fromDb.setDateStarted(now);
967 fromDb.setQueueTime(now.getTime() - job.getDateCreated().getTime());
968 job.setDateStarted(now);
969 job.setQueueTime(now.getTime() - job.getDateCreated().getTime());
970 }
971 } else if (Status.FAILED.equals(status)) {
972
973 if (job.getDateCompleted() == null) {
974 fromDb.setDateCompleted(now);
975 jpaJob.setDateCompleted(now);
976 job.setDateCompleted(now);
977 if (job.getDateStarted() != null) {
978 jpaJob.setRunTime(now.getTime() - job.getDateStarted().getTime());
979 fromDb.setRunTime(now.getTime() - job.getDateStarted().getTime());
980 job.setRunTime(now.getTime() - job.getDateStarted().getTime());
981 }
982 }
983 } else if (Status.FINISHED.equals(status)) {
984 if (job.getDateStarted() == null) {
985
986
987 jpaJob.setDateStarted(job.getDateCreated());
988 job.setDateStarted(job.getDateCreated());
989 }
990 if (job.getDateCompleted() == null) {
991 jpaJob.setDateCompleted(now);
992 jpaJob.setRunTime(now.getTime() - job.getDateStarted().getTime());
993 fromDb.setDateCompleted(now);
994 fromDb.setRunTime(now.getTime() - job.getDateStarted().getTime());
995 job.setDateCompleted(now);
996 job.setRunTime(now.getTime() - job.getDateStarted().getTime());
997 }
998 }
999 }
1000
1001
1002
1003
1004
1005
1006
1007
1008 protected Function<EntityManager, Optional<HostRegistrationJpaImpl>> fetchHostRegistrationQuery(String host) {
1009 return namedQuery.findOpt(
1010 "HostRegistration.byHostName",
1011 HostRegistrationJpaImpl.class,
1012 Pair.of("host", host)
1013 );
1014 }
1015
1016
1017
1018
1019
1020
1021 @Override
1022 public void registerHost(String host, String address, String nodeName, long memory, int cores, float maxLoad)
1023 throws ServiceRegistryException {
1024 try {
1025 HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1026
1027 Optional<HostRegistrationJpaImpl> hostRegistrationOpt = fetchHostRegistrationQuery(host).apply(em);
1028 HostRegistrationJpaImpl hr;
1029
1030 if (hostRegistrationOpt.isEmpty()) {
1031 hr = new HostRegistrationJpaImpl(host, address, nodeName, memory, cores, maxLoad, true, false);
1032 em.persist(hr);
1033 } else {
1034 hr = hostRegistrationOpt.get();
1035 hr.setIpAddress(address);
1036 hr.setNodeName(nodeName);
1037 hr.setMemory(memory);
1038 hr.setCores(cores);
1039 hr.setMaxLoad(maxLoad);
1040 hr.setOnline(true);
1041 em.merge(hr);
1042 }
1043 logger.info("Registering {} with a maximum load of {}", host, maxLoad);
1044 return hr;
1045 });
1046 } catch (Exception e) {
1047 throw new ServiceRegistryException(e);
1048 }
1049 }
1050
1051
1052
1053
1054
1055
1056 @Override
1057 public void unregisterHost(String host) throws ServiceRegistryException {
1058 try {
1059 HostRegistrationJpaImpl existingHostRegistration = db.execTxChecked(em -> {
1060 HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1061 () -> new IllegalArgumentException("Host '" + host + "' is not registered, so it can not be unregistered"));
1062
1063 hr.setOnline(false);
1064 for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1065 unRegisterService(serviceRegistration.getServiceType(), serviceRegistration.getHost());
1066 }
1067 em.merge(hr);
1068
1069 logger.info("Unregistering {}", host);
1070 return hr;
1071 });
1072
1073 logger.info("Host {} unregistered", host);
1074 } catch (Exception e) {
1075 throw new ServiceRegistryException(e);
1076 }
1077 }
1078
1079
1080
1081
1082
1083
1084 @Override
1085 public void enableHost(String host) throws ServiceRegistryException, NotFoundException {
1086 try {
1087 HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1088
1089 HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1090 () -> new NotFoundException("Host '" + host + "' is currently not registered, so it can not be enabled"));
1091 hr.setActive(true);
1092 em.merge(hr);
1093 logger.info("Enabling {}", host);
1094 return hr;
1095 });
1096
1097 db.execTxChecked(em -> {
1098 for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1099 ((ServiceRegistrationJpaImpl) serviceRegistration).setActive(true);
1100 em.merge(serviceRegistration);
1101 }
1102 });
1103 } catch (NotFoundException e) {
1104 throw e;
1105 } catch (Exception e) {
1106 throw new ServiceRegistryException(e);
1107 }
1108 }
1109
1110
1111
1112
1113
1114
1115 @Override
1116 public void disableHost(String host) throws ServiceRegistryException, NotFoundException {
1117 try {
1118 HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
1119 HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
1120 () -> new NotFoundException("Host '" + host + "' is not currently registered, so it can not be disabled"));
1121
1122 hr.setActive(false);
1123 for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
1124 ((ServiceRegistrationJpaImpl) serviceRegistration).setActive(false);
1125 em.merge(serviceRegistration);
1126 }
1127 em.merge(hr);
1128
1129 logger.info("Disabling {}", host);
1130 return hr;
1131 });
1132 } catch (NotFoundException e) {
1133 throw e;
1134 } catch (Exception e) {
1135 throw new ServiceRegistryException(e);
1136 }
1137 }
1138
1139
1140
1141
1142
1143
1144
1145 @Override
1146 public ServiceRegistration registerService(String serviceType, String baseUrl, String path)
1147 throws ServiceRegistryException {
1148 return registerService(serviceType, baseUrl, path, false);
1149 }
1150
1151
1152
1153
1154
1155
1156
1157 @Override
1158 public ServiceRegistration registerService(String serviceType, String baseUrl, String path, boolean jobProducer)
1159 throws ServiceRegistryException {
1160 cleanRunningJobs(serviceType, baseUrl);
1161 return setOnlineStatus(serviceType, baseUrl, path, true, jobProducer);
1162 }
1163
1164 protected Function<EntityManager, Optional<ServiceRegistrationJpaImpl>> getServiceRegistrationQuery(
1165 String serviceType, String host) {
1166 return namedQuery.findOpt(
1167 "ServiceRegistration.getRegistration",
1168 ServiceRegistrationJpaImpl.class,
1169 Pair.of("serviceType", serviceType),
1170 Pair.of("host", host)
1171 );
1172 }
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187 protected ServiceRegistration setOnlineStatus(String serviceType, String baseUrl, String path, boolean online,
1188 Boolean jobProducer) throws ServiceRegistryException {
1189 if (isBlank(serviceType) || isBlank(baseUrl)) {
1190 logger.info("Uninformed baseUrl '{}' or service '{}' (path '{}')", baseUrl, serviceType, path);
1191 throw new IllegalArgumentException("serviceType and baseUrl must not be blank");
1192 }
1193
1194 try {
1195 AtomicReference<HostRegistrationJpaImpl> hostRegistration = new AtomicReference<>();
1196 AtomicReference<ServiceRegistrationJpaImpl> registration = new AtomicReference<>();
1197
1198 db.execTxChecked(em -> {
1199 HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(baseUrl).apply(em).orElseThrow(() -> {
1200 logger.info("No associated host registration for '{}' or service '{}' (path '{}')", baseUrl, serviceType,
1201 path);
1202 return new IllegalStateException(
1203 "A service registration can not be updated when it has no associated host registration");
1204 });
1205 hostRegistration.set(hr);
1206
1207 ServiceRegistrationJpaImpl sr;
1208 Optional<ServiceRegistrationJpaImpl> srOpt = getServiceRegistrationQuery(serviceType, baseUrl).apply(em);
1209 if (srOpt.isEmpty()) {
1210 if (isBlank(path)) {
1211
1212 throw new IllegalArgumentException("path must not be blank when registering new services");
1213 }
1214
1215
1216 sr = new ServiceRegistrationJpaImpl(hr, serviceType, path, Objects.requireNonNullElse(jobProducer, false));
1217 em.persist(sr);
1218 } else {
1219 sr = srOpt.get();
1220 if (StringUtils.isNotBlank(path)) {
1221 sr.setPath(path);
1222 }
1223 sr.setOnline(online);
1224 if (jobProducer != null) {
1225 sr.setJobProducer(jobProducer);
1226 }
1227 em.merge(sr);
1228 }
1229 registration.set(sr);
1230 });
1231 return registration.get();
1232 } catch (Exception e) {
1233 throw new ServiceRegistryException(e);
1234 }
1235 }
1236
1237
1238
1239
1240
1241
1242 @Override
1243 public void unRegisterService(String serviceType, String baseUrl) throws ServiceRegistryException {
1244 logger.info("Unregistering Service {}@{} and cleaning its running jobs", serviceType, baseUrl);
1245
1246
1247 setOnlineStatus(serviceType, baseUrl, null, false, null);
1248 cleanRunningJobs(serviceType, baseUrl);
1249 }
1250
1251
1252
1253
1254 private void cleanUndispatchableJobs(String hostName) {
1255 logger.debug("Starting check for undispatchable jobs for host {}", hostName);
1256
1257 try {
1258 db.execTxChecked(em -> {
1259 List<JpaJob> undispatchableJobs = namedQuery.findAll(
1260 "Job.undispatchable.status",
1261 JpaJob.class,
1262 Pair.of("statuses", List.of(
1263 Status.INSTANTIATED.ordinal(),
1264 Status.RUNNING.ordinal()
1265 ))
1266 ).apply(em);
1267
1268 if (undispatchableJobs.size() > 0) {
1269 logger.info("Found {} undispatchable jobs on host {}", undispatchableJobs.size(), hostName);
1270 }
1271
1272 for (JpaJob job : undispatchableJobs) {
1273
1274 String jobHost = "";
1275 if (job.getProcessorServiceRegistration() != null) {
1276 jobHost = job.getProcessorServiceRegistration().getHost();
1277 }
1278
1279 if (!jobHost.equals(hostName)) {
1280 logger.debug("Will not cancel undispatchable job {} for host {}, it is running on a different host ({})",
1281 job, hostName, jobHost);
1282 continue;
1283 }
1284
1285 logger.info("Cancelling the running undispatchable job {}, it was orphaned on this host ({})", job, hostName);
1286 job.setStatus(Status.CANCELLED);
1287 em.merge(job);
1288 }
1289 });
1290 } catch (Exception e) {
1291 logger.error("Unable to clean undispatchable jobs for host {}! {}", hostName, e.getMessage());
1292 }
1293 }
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305 private void cleanRunningJobs(String serviceType, String baseUrl) throws ServiceRegistryException {
1306 try {
1307 db.execTxChecked(em -> {
1308 TypedQuery<JpaJob> query = em.createNamedQuery("Job.processinghost.status", JpaJob.class)
1309 .setLockMode(LockModeType.PESSIMISTIC_WRITE)
1310 .setParameter("statuses", List.of(
1311 Status.RUNNING.ordinal(),
1312 Status.DISPATCHING.ordinal(),
1313 Status.WAITING.ordinal()
1314 ))
1315 .setParameter("host", baseUrl)
1316 .setParameter("serviceType", serviceType);
1317
1318 List<JpaJob> unregisteredJobs = query.getResultList();
1319 if (unregisteredJobs.size() > 0) {
1320 logger.info("Found {} jobs to clean for {}@{}", unregisteredJobs.size(), serviceType, baseUrl);
1321 }
1322
1323 for (JpaJob job : unregisteredJobs) {
1324 if (job.isDispatchable()) {
1325 em.refresh(job);
1326
1327 if (Status.CANCELLED.equals(job.getStatus()) || Status.RESTART.equals(job.getStatus())) {
1328 continue;
1329 }
1330
1331 if (job.getRootJob() != null && Status.PAUSED.equals(job.getRootJob().getStatus())) {
1332 JpaJob rootJob = job.getRootJob();
1333 cancelAllChildrenQuery(rootJob).accept(em);
1334 rootJob.setStatus(Status.RESTART);
1335 rootJob.setOperation(START_OPERATION);
1336 em.merge(rootJob);
1337 continue;
1338 }
1339
1340 logger.info("Marking child jobs from {} as canceled", job);
1341 cancelAllChildrenQuery(job).accept(em);
1342
1343 logger.info("Rescheduling lost {}", job);
1344 job.setStatus(Status.RESTART);
1345 job.setProcessorServiceRegistration(null);
1346 } else {
1347 logger.info("Marking lost {} as failed", job);
1348 job.setStatus(Status.FAILED);
1349 }
1350
1351 em.merge(job);
1352 }
1353 });
1354 } catch (Exception e) {
1355 throw new ServiceRegistryException(e);
1356 }
1357 }
1358
1359
1360
1361
1362
1363
1364
1365 private Consumer<EntityManager> cancelAllChildrenQuery(JpaJob job) {
1366 return em -> job.getChildJobs().stream()
1367 .peek(em::refresh)
1368 .filter(child -> Status.CANCELLED.equals(child.getStatus()))
1369 .forEach(child -> {
1370 cancelAllChildrenQuery(child).accept(em);
1371 child.setStatus(Status.CANCELLED);
1372 em.merge(child);
1373 });
1374 }
1375
1376
1377
1378
1379
1380
1381 @Override
1382 public void setMaintenanceStatus(String baseUrl, boolean maintenance) throws NotFoundException {
1383 logger.info("Setting maintenance mode on host '{}'", baseUrl);
1384 HostRegistrationJpaImpl reg = db.execTxChecked(em -> {
1385 HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(baseUrl).apply(em).orElseThrow(() -> {
1386 logger.warn("Can not set maintenance mode because host '{}' was not registered", baseUrl);
1387 return new NotFoundException("Can not set maintenance mode on a host that has not been registered");
1388 });
1389 hr.setMaintenanceMode(maintenance);
1390 em.merge(hr);
1391 return hr;
1392 });
1393
1394 logger.info("Finished setting maintenance mode on host '{}'", baseUrl);
1395 }
1396
1397
1398
1399
1400
1401
1402 @Override
1403 public List<ServiceRegistration> getServiceRegistrations() {
1404 return db.exec(getServiceRegistrationsQuery());
1405 }
1406
1407 @Override
1408 public Incidents incident() {
1409 return incidents;
1410 }
1411
1412 private List<ServiceRegistration> getOnlineServiceRegistrations() {
1413 return db.exec(namedQuery.findAll("ServiceRegistration.getAllOnline", ServiceRegistration.class));
1414 }
1415
1416
1417
1418
1419
1420
1421 protected Function<EntityManager, List<ServiceRegistration>> getServiceRegistrationsQuery() {
1422 return namedQuery.findAll("ServiceRegistration.getAll", ServiceRegistration.class);
1423 }
1424
1425
1426
1427
1428
1429
1430 @Override
1431 public List<HostRegistration> getHostRegistrations() {
1432 return db.exec(getHostRegistrationsQuery());
1433 }
1434
1435 @Override
1436 public HostStatistics getHostStatistics() {
1437 HostStatistics statistics = new HostStatistics();
1438
1439 db.exec(namedQuery.findAll(
1440 "HostRegistration.jobStatistics",
1441 Object[].class,
1442 Pair.of("status", List.of(Status.QUEUED.ordinal(), Status.RUNNING.ordinal()))
1443 )).forEach(row -> {
1444 final long host = ((Number) row[0]).longValue();
1445 final int status = ((Number) row[1]).intValue();
1446 final long count = ((Number) row[2]).longValue();
1447
1448 if (status == Status.RUNNING.ordinal()) {
1449 statistics.addRunning(host, count);
1450 } else {
1451 statistics.addQueued(host, count);
1452 }
1453 });
1454
1455 return statistics;
1456 }
1457
1458
1459
1460
1461
1462
1463 protected Function<EntityManager, List<HostRegistration>> getHostRegistrationsQuery() {
1464 return namedQuery.findAll("HostRegistration.getAll", HostRegistration.class);
1465 }
1466
1467 @Override
1468 public HostRegistration getHostRegistration(String hostname) throws ServiceRegistryException {
1469 return db.exec(getHostRegistrationQuery(hostname));
1470 }
1471
1472 protected Function<EntityManager, HostRegistration> getHostRegistrationQuery(String hostname) {
1473 return namedQuery.find(
1474 "HostRegistration.byHostName",
1475 HostRegistration.class,
1476 Pair.of("host", hostname)
1477 );
1478 }
1479
1480
1481
1482
1483
1484
1485 @Override
1486 public List<Job> getChildJobs(long id) throws ServiceRegistryException {
1487 try {
1488 List<JpaJob> jobs = db.exec(namedQuery.findAll(
1489 "Job.root.children",
1490 JpaJob.class,
1491 Pair.of("id", id)
1492 ));
1493
1494 if (jobs.size() == 0) {
1495 jobs = db.exec(getChildrenQuery(id));
1496 }
1497
1498 return jobs.stream()
1499 .map(this::setJobUri)
1500 .map(JpaJob::toJob)
1501 .collect(Collectors.toList());
1502 } catch (Exception e) {
1503 throw new ServiceRegistryException(e);
1504 }
1505 }
1506
1507 private Function<EntityManager, List<JpaJob>> getChildrenQuery(long id) {
1508 return em -> {
1509 TypedQuery<JpaJob> query = em
1510 .createNamedQuery("Job.children", JpaJob.class)
1511 .setParameter("id", id);
1512
1513 List<JpaJob> childJobs = query.getResultList();
1514
1515 List<JpaJob> result = new ArrayList<>(childJobs);
1516 childJobs.stream()
1517 .map(j -> getChildrenQuery(j.getId()).apply(em))
1518 .forEach(result::addAll);
1519
1520 return result;
1521 };
1522 }
1523
1524
1525
1526
1527
1528
1529 @Override
1530 public List<Job> getJobs(String type, Status status) throws ServiceRegistryException {
1531 logger.trace("Getting jobs '{}' and '{}'", type, status);
1532
1533 Function<EntityManager, List<JpaJob>> jobsQuery;
1534 if (type == null && status == null) {
1535 jobsQuery = namedQuery.findAll("Job.all", JpaJob.class);
1536 } else if (type == null) {
1537 jobsQuery = namedQuery.findAll(
1538 "Job.status",
1539 JpaJob.class,
1540 Pair.of("status", status.ordinal())
1541 );
1542 } else if (status == null) {
1543 jobsQuery = namedQuery.findAll(
1544 "Job.type",
1545 JpaJob.class,
1546 Pair.of("serviceType", type)
1547 );
1548 } else {
1549 jobsQuery = namedQuery.findAll(
1550 "Job",
1551 JpaJob.class,
1552 Pair.of("status", status.ordinal()),
1553 Pair.of("serviceType", type)
1554 );
1555 }
1556
1557 try {
1558 return db.exec(jobsQuery).stream()
1559 .peek(this::setJobUri)
1560 .map(JpaJob::toJob)
1561 .collect(Collectors.toList());
1562 } catch (Exception e) {
1563 throw new ServiceRegistryException(e);
1564 }
1565 }
1566
1567 @Override
1568 public List<String> getJobPayloads(String operation) throws ServiceRegistryException {
1569 try {
1570 return db.exec(namedQuery.findAll(
1571 "Job.payload",
1572 String.class,
1573 Pair.of("operation", operation)
1574 ));
1575 } catch (Exception e) {
1576 throw new ServiceRegistryException(e);
1577 }
1578 }
1579
1580 @Override
1581 public List<String> getJobPayloads(String operation, int limit, int offset) throws ServiceRegistryException {
1582 try {
1583 return db.exec(em -> {
1584 return em.createNamedQuery("Job.payload", String.class)
1585 .setParameter("operation", operation)
1586 .setMaxResults(limit)
1587 .setFirstResult(offset)
1588 .getResultList();
1589 });
1590 } catch (Exception e) {
1591 throw new ServiceRegistryException(e);
1592 }
1593 }
1594
1595 @Override
1596 public int getJobCount(final String operation) throws ServiceRegistryException {
1597 try {
1598 return db.exec(namedQuery.find(
1599 "Job.countByOperationOnly",
1600 Number.class,
1601 Pair.of("operation", operation)
1602 )).intValue();
1603 } catch (Exception e) {
1604 throw new ServiceRegistryException(e);
1605 }
1606 }
1607
1608
1609
1610
1611
1612
1613 @Override
1614 public List<Job> getActiveJobs() throws ServiceRegistryException {
1615 try {
1616 return db.exec(getJobsByStatusQuery(activeJobStatus)).stream()
1617 .map(JpaJob::toJob)
1618 .collect(Collectors.toList());
1619 } catch (Exception e) {
1620 throw new ServiceRegistryException(e);
1621 }
1622 }
1623
1624
1625
1626
1627
1628
1629
1630
1631 public Function<EntityManager, List<JpaJob>> getJobsByStatusQuery(Status... statuses) {
1632 if (statuses == null || statuses.length < 1) {
1633 throw new IllegalArgumentException("At least one job status must be given.");
1634 }
1635
1636 return namedQuery.findAll(
1637 "Job.statuses",
1638 JpaJob.class,
1639 Pair.of("statuses", Arrays.stream(statuses).map(Enum::ordinal).collect(Collectors.toList()))
1640 ).andThen(jobs -> jobs.stream()
1641 .peek(this::setJobUri)
1642 .collect(Collectors.toList()));
1643 }
1644
1645 @Override
1646 public Map<String, Map<String, Long>> countActiveByOrganizationAndHost() {
1647 var rows = db.exec(namedQuery.findAll(
1648 "Job.countByOrganizationAndHost",
1649 Object[].class,
1650 Pair.of("statuses", Arrays.stream(activeJobStatus).map(Enum::ordinal).collect(Collectors.toList()))
1651 )).stream().collect(Collectors.toList());
1652 var orgMap = new HashMap<String, Map<String, Long>>();
1653 for (Object[] row: rows) {
1654 var org = (String) row[0];
1655 var host = (String) row[1];
1656 var count = (Long) row[2];
1657 orgMap.computeIfAbsent(org, o -> new HashMap<>()).put(host, count);
1658 }
1659 return orgMap;
1660 }
1661
1662 @Override
1663 public Map<String, Long> countActiveTypeByOrganization(final String operation) {
1664 return db.exec(namedQuery.findAll(
1665 "Job.countTypeByOrganization",
1666 Object[].class,
1667 Pair.of("statuses", Arrays.stream(activeJobStatus).map(Enum::ordinal).collect(Collectors.toList())),
1668 Pair.of("operation", operation)
1669 )).stream().collect(Collectors.toMap(
1670 row -> (String) row[0],
1671 row -> (Long) row[1]
1672 ));
1673 }
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683 protected Function<EntityManager, List<JpaJob>> getDispatchableJobsWithStatusQuery(int offset, int limit,
1684 Status... statuses) {
1685 return em -> {
1686 if (statuses == null) {
1687 return Collections.emptyList();
1688 }
1689
1690 TypedQuery<JpaJob> query = em
1691 .createNamedQuery("Job.dispatchable.status", JpaJob.class)
1692 .setParameter("statuses", Arrays.stream(statuses).map(Enum::ordinal).collect(Collectors.toList()));
1693 if (offset > 0) {
1694 query.setFirstResult(offset);
1695 }
1696 if (limit > 0) {
1697 query.setMaxResults(limit);
1698 }
1699 return query.getResultList();
1700 };
1701 }
1702
1703 Function<EntityManager, List<Object[]>> getAvgOperationsQuery() {
1704 return namedQuery.findAll("Job.avgOperation", Object[].class);
1705 }
1706
1707 Function<EntityManager, List<Object[]>> getCountPerHostServiceQuery() {
1708 return namedQuery.findAll("Job.countPerHostService", Object[].class);
1709 }
1710
1711
1712
1713
1714
1715
1716 @Override
1717 public long count(String serviceType, Status status) throws ServiceRegistryException {
1718 Function<EntityManager, Number> countQuery;
1719 if (serviceType == null && status == null) {
1720 countQuery = namedQuery.find(
1721 "Job.count.all",
1722 Number.class
1723 );
1724 } else if (serviceType == null) {
1725 countQuery = namedQuery.find(
1726 "Job.count.nullType",
1727 Number.class,
1728 Pair.of("status", status.ordinal())
1729 );
1730 } else if (status == null) {
1731 countQuery = namedQuery.find(
1732 "Job.count.nullStatus",
1733 Number.class,
1734 Pair.of("serviceType", serviceType)
1735 );
1736 } else {
1737 countQuery = namedQuery.find(
1738 "Job.count",
1739 Number.class,
1740 Pair.of("status", status.ordinal()),
1741 Pair.of("serviceType", serviceType)
1742 );
1743 }
1744
1745 try {
1746 return db.exec(countQuery).longValue();
1747 } catch (Exception e) {
1748 throw new ServiceRegistryException(e);
1749 }
1750 }
1751
1752
1753
1754
1755
1756
1757
1758 @Override
1759 public long countByHost(String serviceType, String host, Status status) throws ServiceRegistryException {
1760 Function<EntityManager, Number> countQuery;
1761 if (serviceType != null && !serviceType.isEmpty()) {
1762 countQuery = namedQuery.find(
1763 "Job.countByHost",
1764 Number.class,
1765 Pair.of("serviceType", serviceType),
1766 Pair.of("status", status.ordinal()),
1767 Pair.of("host", host)
1768 );
1769 } else {
1770 countQuery = namedQuery.find(
1771 "Job.countByHost.nullType",
1772 Number.class,
1773 Pair.of("status", status.ordinal()),
1774 Pair.of("host", host)
1775 );
1776 }
1777
1778 try {
1779 return db.exec(countQuery).longValue();
1780 } catch (Exception e) {
1781 throw new ServiceRegistryException(e);
1782 }
1783 }
1784
1785
1786
1787
1788
1789
1790
1791
1792 @Override
1793 public long countByOperation(String serviceType, String operation, Status status) throws ServiceRegistryException {
1794 try {
1795 return db.exec(namedQuery.find(
1796 "Job.countByOperation",
1797 Number.class,
1798 Pair.of("status", status.ordinal()),
1799 Pair.of("serviceType", serviceType),
1800 Pair.of("operation", operation)
1801 )).longValue();
1802 } catch (Exception e) {
1803 throw new ServiceRegistryException(e);
1804 }
1805 }
1806
1807
1808
1809
1810
1811
1812
1813 @Override
1814 public long count(String serviceType, String host, String operation, Status status) throws ServiceRegistryException {
1815 if (StringUtils.isBlank(serviceType) || StringUtils.isBlank(host) || StringUtils.isBlank(operation)
1816 || status == null) {
1817 throw new IllegalArgumentException("service type, host, operation, and status must be provided");
1818 }
1819
1820 try {
1821 return db.exec(namedQuery.find(
1822 "Job.fullMonty",
1823 Number.class,
1824 Pair.of("status", status.ordinal()),
1825 Pair.of("serviceType", serviceType),
1826 Pair.of("operation", operation)
1827 )).longValue();
1828 } catch (Exception e) {
1829 throw new ServiceRegistryException(e);
1830 }
1831 }
1832
1833
1834
1835
1836
1837
1838 @Override
1839 public List<ServiceStatistics> getServiceStatistics() throws ServiceRegistryException {
1840 Date now = new Date();
1841 try {
1842 return db.exec(getServiceStatisticsQuery(
1843 DateUtils.addDays(now, -maxJobAge),
1844 DateUtils.addDays(now, 1)
1845 ));
1846 } catch (Exception e) {
1847 throw new ServiceRegistryException(e);
1848 }
1849 }
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861 private Function<EntityManager, List<ServiceStatistics>> getServiceStatisticsQuery(Date startDate, Date endDate) {
1862 return em -> {
1863 Map<Long, JaxbServiceStatistics> statsMap = new HashMap<>();
1864
1865
1866 namedQuery.findAll("ServiceRegistration.getAll", ServiceRegistrationJpaImpl.class).apply(em).forEach(s ->
1867 statsMap.put(s.getId(), new JaxbServiceStatistics(s))
1868 );
1869
1870 if (collectJobstats) {
1871
1872 namedQuery.findAll(
1873 "ServiceRegistration.statistics",
1874 Object[].class,
1875 Pair.of("minDateCreated", startDate),
1876 Pair.of("maxDateCreated", endDate)
1877 ).apply(em).forEach(row -> {
1878 Number serviceRegistrationId = (Number) row[0];
1879 if (serviceRegistrationId == null || serviceRegistrationId.longValue() == 0) {
1880 return;
1881 }
1882 Status status = Status.values()[((Number) row[1]).intValue()];
1883 Number count = (Number) row[2];
1884 Number meanQueueTime = (Number) row[3];
1885 Number meanRunTime = (Number) row[4];
1886
1887
1888 JaxbServiceStatistics stats = statsMap.get(serviceRegistrationId.longValue());
1889 if (stats == null) {
1890 return;
1891 }
1892
1893
1894 if (status != null) {
1895 switch (status) {
1896 case RUNNING:
1897 stats.setRunningJobs(count.intValue());
1898 break;
1899 case QUEUED:
1900 case DISPATCHING:
1901 stats.setQueuedJobs(count.intValue());
1902 break;
1903 case FINISHED:
1904 stats.setMeanRunTime(meanRunTime.longValue());
1905 stats.setMeanQueueTime(meanQueueTime.longValue());
1906 stats.setFinishedJobs(count.intValue());
1907 break;
1908 default:
1909 break;
1910 }
1911 }
1912 });
1913 }
1914
1915 List<ServiceStatistics> stats = new ArrayList<>(statsMap.values());
1916 stats.sort((o1, o2) -> {
1917 ServiceRegistration reg1 = o1.getServiceRegistration();
1918 ServiceRegistration reg2 = o2.getServiceRegistration();
1919 int typeComparison = reg1.getServiceType().compareTo(reg2.getServiceType());
1920 return typeComparison == 0
1921 ? reg1.getHost().compareTo(reg2.getHost())
1922 : typeComparison;
1923 });
1924
1925 return stats;
1926 };
1927 }
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937 @Override
1938 public List<ServiceRegistration> getServiceRegistrationsByLoad(String serviceType) throws ServiceRegistryException {
1939 SystemLoad loadByHost = getCurrentHostLoads();
1940 List<HostRegistration> hostRegistrations = getHostRegistrations();
1941 List<ServiceRegistration> serviceRegistrations = getServiceRegistrationsByType(serviceType);
1942 return getServiceRegistrationsByLoad(serviceType, serviceRegistrations, hostRegistrations, loadByHost);
1943 }
1944
1945
1946
1947
1948
1949
1950 @Override
1951 public SystemLoad getCurrentHostLoads() {
1952 return db.exec(getHostLoadsQuery());
1953 }
1954
1955
1956
1957
1958
1959
1960 Function<EntityManager, SystemLoad> getHostLoadsQuery() {
1961 return em -> {
1962 final SystemLoad systemLoad = new SystemLoad();
1963
1964
1965 List<Integer> statuses = JOB_STATUSES_INFLUENCING_LOAD_BALANCING.stream()
1966 .map(Enum::ordinal)
1967 .collect(Collectors.toList());
1968 List<Object[]> rows = namedQuery.findAll(
1969 "ServiceRegistration.hostloads",
1970 Object[].class,
1971 Pair.of("statuses", statuses),
1972
1973
1974 Pair.of("workflow_type", TYPE_WORKFLOW)
1975 ).apply(em);
1976
1977
1978 for (Object[] row : rows) {
1979 String host = String.valueOf(row[0]);
1980 Status status = Status.values()[(int) row[1]];
1981 float currentLoad = ((Number) row[2]).floatValue();
1982 float maxLoad = ((Number) row[3]).floatValue();
1983
1984
1985 if (status == null || !JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(status)) {
1986 currentLoad = 0.0f;
1987 }
1988
1989 NodeLoad serviceLoad = new NodeLoad(host, currentLoad, maxLoad);
1990 systemLoad.addNodeLoad(serviceLoad);
1991 }
1992
1993
1994 getHostRegistrationsQuery().apply(em).stream()
1995 .filter(h -> !systemLoad.containsHost(h.getBaseUrl()))
1996 .forEach(h -> systemLoad.addNodeLoad(new NodeLoad(h.getBaseUrl(), 0.0f, h.getMaxLoad())));
1997 return systemLoad;
1998 };
1999 }
2000
2001
2002
2003
2004
2005
2006 @Override
2007 public List<ServiceRegistration> getServiceRegistrationsByType(String serviceType) throws ServiceRegistryException {
2008 return db.exec(namedQuery.findAll(
2009 "ServiceRegistration.getByType",
2010 ServiceRegistration.class,
2011 Pair.of("serviceType", serviceType)
2012 ));
2013 }
2014
2015
2016
2017
2018
2019
2020 @Override
2021 public List<ServiceRegistration> getServiceRegistrationsByHost(String host) throws ServiceRegistryException {
2022 return db.exec(getServiceRegistrationsByHostQuery(host));
2023 }
2024
2025 private Function<EntityManager, List<ServiceRegistration>> getServiceRegistrationsByHostQuery(String host) {
2026 return namedQuery.findAll(
2027 "ServiceRegistration.getByHost",
2028 ServiceRegistration.class,
2029 Pair.of("host", host)
2030 );
2031 }
2032
2033
2034
2035
2036
2037
2038
2039 @Override
2040 public ServiceRegistration getServiceRegistration(String serviceType, String host) {
2041 return db.exec(getServiceRegistrationQuery(serviceType, host))
2042 .orElse(null);
2043 }
2044
2045
2046
2047
2048
2049
2050
2051 @Reference
2052 void setTrustedHttpClient(TrustedHttpClient client) {
2053 this.client = client;
2054 }
2055
2056
2057
2058
2059
2060
2061
2062 @Reference
2063 public void setSecurityService(SecurityService securityService) {
2064 this.securityService = securityService;
2065 }
2066
2067
2068 @Reference(
2069 cardinality = ReferenceCardinality.OPTIONAL,
2070 policy = ReferencePolicy.DYNAMIC,
2071 unbind = "unsetIncidentService"
2072 )
2073 public void setIncidentService(IncidentService incidentService) {
2074 this.incidentService = incidentService;
2075
2076 ((OsgiIncidentService) incidentService).setServiceRegistry(this);
2077 this.incidents = new Incidents(this, incidentService);
2078 }
2079
2080 public void unsetIncidentService(IncidentService incidentService) {
2081 if (this.incidentService == incidentService) {
2082 this.incidentService = null;
2083 this.incidents = null;
2084 }
2085 }
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096 private void updateServiceForFailover(JpaJob job) throws ServiceRegistryException, NotFoundException {
2097 if (job.getStatus() != Status.FAILED && job.getStatus() != Status.FINISHED) {
2098 return;
2099 }
2100
2101 job.setStatus(job.getStatus(), job.getFailureReason());
2102
2103
2104
2105 ServiceRegistrationJpaImpl currentService = job.getProcessorServiceRegistration();
2106 if (currentService == null) {
2107 return;
2108 }
2109
2110
2111 if (job.getStatus() == FAILED && !DATA.equals(job.getFailureReason())) {
2112
2113
2114 List<ServiceRegistrationJpaImpl> relatedWarningOrErrorServices = getRelatedWarningErrorServices(job);
2115
2116
2117 if (relatedWarningOrErrorServices.size() > 0) {
2118 for (ServiceRegistrationJpaImpl relatedService : relatedWarningOrErrorServices) {
2119
2120 if (currentService.equals(relatedService)) {
2121 continue;
2122 }
2123
2124
2125
2126 if (relatedService.getServiceState() == WARNING) {
2127 logger.info("State reset to NORMAL for related service {} on host {}", relatedService.getServiceType(),
2128 relatedService.getHost());
2129 relatedService.setServiceState(NORMAL, job.toJob().getSignature());
2130 }
2131
2132
2133 else if (relatedService.getServiceState() == ERROR) {
2134 logger.info("State reset to WARNING for related service {} on host {}", relatedService.getServiceType(),
2135 relatedService.getHost());
2136 relatedService.setServiceState(WARNING, relatedService.getWarningStateTrigger());
2137 }
2138
2139 updateServiceState(relatedService);
2140 }
2141 }
2142
2143
2144 else {
2145
2146 if (currentService.getServiceState() == NORMAL) {
2147 logger.info("State set to WARNING for current service {} on host {}", currentService.getServiceType(),
2148 currentService.getHost());
2149 currentService.setServiceState(WARNING, job.toJob().getSignature());
2150 updateServiceState(currentService);
2151 }
2152
2153
2154 else if (errorStatesEnabled && !noErrorStateServiceTypes.contains(currentService.getServiceType())
2155 && getHistorySize(currentService) >= maxAttemptsBeforeErrorState) {
2156 logger.info("State set to ERROR for current service {} on host {}", currentService.getServiceType(),
2157 currentService.getHost());
2158 currentService.setServiceState(ERROR, job.toJob().getSignature());
2159 updateServiceState(currentService);
2160 }
2161 }
2162 }
2163
2164
2165 else if (job.getStatus() == Status.FINISHED) {
2166
2167 if (currentService.getServiceState() == WARNING) {
2168 logger.info("State reset to NORMAL for current service {} on host {}", currentService.getServiceType(),
2169 currentService.getHost());
2170 currentService.setServiceState(NORMAL);
2171 updateServiceState(currentService);
2172 }
2173 }
2174 }
2175
2176
2177
2178
2179
2180
2181 @Override
2182 public void sanitize(String serviceType, String host) throws NotFoundException {
2183 db.execChecked(em -> {
2184 ServiceRegistrationJpaImpl service = getServiceRegistrationQuery(serviceType, host).apply(em)
2185 .orElseThrow(NotFoundException::new);
2186
2187 logger.info("State reset to NORMAL for service {} on host {} through sanitize method", service.getServiceType(),
2188 service.getHost());
2189 service.setServiceState(NORMAL);
2190 updateServiceState(service);
2191 });
2192 }
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203 private int getHistorySize(ServiceRegistration serviceRegistration) throws ServiceRegistryException {
2204 if (serviceRegistration == null) {
2205 throw new IllegalArgumentException("serviceRegistration must not be null!");
2206 }
2207
2208 logger.debug("Calculating count of jobs who failed due to service {}", serviceRegistration);
2209
2210 try {
2211 return db.exec(namedQuery.find(
2212 "Job.count.history.failed",
2213 Number.class,
2214 Pair.of("serviceType", serviceRegistration.getServiceType()),
2215 Pair.of("host", serviceRegistration.getHost())
2216 )).intValue();
2217 } catch (Exception e) {
2218 throw new ServiceRegistryException(e);
2219 }
2220 }
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233 private List<ServiceRegistrationJpaImpl> getRelatedWarningErrorServices(JpaJob job) throws ServiceRegistryException {
2234 if (job == null) {
2235 throw new IllegalArgumentException("job must not be null!");
2236 }
2237
2238 logger.debug("Try to get the services in WARNING or ERROR state triggered by {} failed", job);
2239
2240 try {
2241 return db.exec(namedQuery.findAll(
2242 "ServiceRegistration.relatedservices.warning_error",
2243 ServiceRegistrationJpaImpl.class,
2244 Pair.of("serviceType", job.getJobType())
2245 )).stream()
2246
2247 .filter(rs ->
2248 (rs.getServiceState() == WARNING && rs.getWarningStateTrigger() == job.toJob().getSignature())
2249 || (rs.getServiceState() == ERROR && rs.getErrorStateTrigger() == job.toJob().getSignature())
2250 ).collect(Collectors.toList());
2251 } catch (Exception e) {
2252 throw new ServiceRegistryException(e);
2253 }
2254 }
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269 protected List<ServiceRegistration> getServiceRegistrationsWithCapacity(String jobType,
2270 List<ServiceRegistration> serviceRegistrations, List<HostRegistration> hostRegistrations,
2271 final SystemLoad systemLoad) {
2272 final List<String> hostBaseUrls = hostRegistrations.stream()
2273 .map(HostRegistration::getBaseUrl)
2274 .collect(Collectors.toUnmodifiableList());
2275 final List<ServiceRegistration> filteredList = new ArrayList<>();
2276
2277 for (ServiceRegistration service : serviceRegistrations) {
2278
2279 if (!hostBaseUrls.contains(service.getHost())) {
2280 logger.trace("Not considering {} because it's host {} is not available for dispatching", service,
2281 service.getHost());
2282 continue;
2283 }
2284
2285
2286 if (!jobType.equals(service.getServiceType())) {
2287 logger.trace("Not considering {} because it is of the wrong job type", service);
2288 continue;
2289 }
2290
2291
2292 if (service.getServiceState() == ERROR) {
2293 logger.trace("Not considering {} because it is in error state", service);
2294 continue;
2295 }
2296
2297
2298 if (service.isInMaintenanceMode()) {
2299 logger.trace("Not considering {} because it is in maintenance mode", service);
2300 continue;
2301 }
2302
2303
2304 if (!service.isOnline()) {
2305 logger.trace("Not considering {} because it is currently offline", service);
2306 continue;
2307 }
2308
2309
2310 Float hostLoadMax = null;
2311 for (HostRegistration host : hostRegistrations) {
2312 if (host.getBaseUrl().equals(service.getHost())) {
2313 hostLoadMax = host.getMaxLoad();
2314 break;
2315 }
2316 }
2317 if (hostLoadMax == null) {
2318 logger.warn("Unable to determine max load for host {}", service.getHost());
2319 }
2320
2321
2322 Float hostLoad = systemLoad.get(service.getHost()).getLoadFactor();
2323 if (hostLoad == null) {
2324 logger.warn("Unable to determine current load for host {}", service.getHost());
2325 }
2326
2327
2328 if (hostLoad == null || hostLoadMax == null || hostLoad < hostLoadMax) {
2329 logger.debug("Adding candidate service {} for processing of jobs of type '{}' (host load is {} of max {})",
2330 service, jobType, hostLoad, hostLoadMax);
2331 filteredList.add(service);
2332 }
2333 }
2334
2335
2336 filteredList.sort(new LoadComparator(systemLoad));
2337
2338 return filteredList;
2339 }
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354 protected List<ServiceRegistration> getServiceRegistrationsByLoad(String jobType,
2355 List<ServiceRegistration> serviceRegistrations, List<HostRegistration> hostRegistrations,
2356 final SystemLoad systemLoad) {
2357 final List<String> hostBaseUrls = hostRegistrations.stream()
2358 .map(HostRegistration::getBaseUrl)
2359 .collect(Collectors.toUnmodifiableList());
2360 final List<ServiceRegistration> filteredList = new ArrayList<>();
2361
2362 logger.debug("Finding services to dispatch job of type {}", jobType);
2363
2364 for (ServiceRegistration service : serviceRegistrations) {
2365
2366 if (!hostBaseUrls.contains(service.getHost())) {
2367 logger.trace("Not considering {} because it's host {} is not available for dispatching", service,
2368 service.getHost());
2369 continue;
2370 }
2371
2372
2373 if (!jobType.equals(service.getServiceType())) {
2374 logger.trace("Not considering {} because it is of the wrong job type", service);
2375 continue;
2376 }
2377
2378
2379 if (service.getServiceState() == ERROR) {
2380 logger.trace("Not considering {} because it is in error state", service);
2381 continue;
2382 }
2383
2384
2385 if (service.isInMaintenanceMode()) {
2386 logger.trace("Not considering {} because it is in maintenance mode", service);
2387 continue;
2388 }
2389
2390
2391 if (!service.isOnline()) {
2392 logger.trace("Not considering {} because it is currently offline", service);
2393 continue;
2394 }
2395
2396
2397 logger.debug("Adding candidate service {} for processing of job of type '{}'", service, jobType);
2398 filteredList.add(service);
2399 }
2400
2401
2402 if ("org.opencastproject.composer".equals(jobType)) {
2403 Collections.sort(filteredList, new LoadComparatorEncoding(systemLoad));
2404 } else {
2405 Collections.sort(filteredList, new LoadComparator(systemLoad));
2406 }
2407
2408 return filteredList;
2409 }
2410
2411
2412
2413
2414
2415
2416 @Override
2417 public SystemLoad getMaxLoads() throws ServiceRegistryException {
2418 final SystemLoad loads = new SystemLoad();
2419 getHostRegistrations().stream()
2420 .map(host -> new NodeLoad(host.getBaseUrl(), 0.0f, host.getMaxLoad()))
2421 .forEach(loads::addNodeLoad);
2422 return loads;
2423 }
2424
2425
2426
2427
2428
2429
2430 @Override
2431 public NodeLoad getMaxLoadOnNode(String host) throws ServiceRegistryException, NotFoundException {
2432 try {
2433 float maxLoad = db.exec(namedQuery.find(
2434 "HostRegistration.getMaxLoadByHostName",
2435 Number.class,
2436 Pair.of("host", host)
2437 )).floatValue();
2438 return new NodeLoad(host, 0.0f, maxLoad);
2439 } catch (NoResultException e) {
2440 throw new NotFoundException(e);
2441 } catch (Exception e) {
2442 throw new ServiceRegistryException(e);
2443 }
2444 }
2445
2446
2447 class JobProducerHeartbeat implements Runnable {
2448
2449
2450 private final List<ServiceRegistration> unresponsive = new ArrayList<>();
2451
2452
2453
2454
2455
2456
2457 @Override
2458 public void run() {
2459 logger.debug("Checking for unresponsive services");
2460
2461 try {
2462 List<ServiceRegistration> serviceRegistrations = getOnlineServiceRegistrations();
2463
2464 for (ServiceRegistration service : serviceRegistrations) {
2465 if (!service.isJobProducer()) {
2466 continue;
2467 }
2468 if (service.isInMaintenanceMode()) {
2469 continue;
2470 }
2471
2472
2473 String serviceUrl = UrlSupport.concat(service.getHost(), service.getPath(), "dispatch");
2474
2475 HttpHead options = new HttpHead(serviceUrl);
2476 HttpResponse response = null;
2477 try {
2478 try {
2479 response = client.execute(options);
2480 if (response != null) {
2481 switch (response.getStatusLine().getStatusCode()) {
2482 case HttpStatus.SC_OK:
2483
2484 logger.trace("Service " + service + " is responsive: " + response.getStatusLine());
2485 if (unresponsive.remove(service)) {
2486 logger.info("Service {} is still online", service);
2487 } else if (!service.isOnline()) {
2488 try {
2489 setOnlineStatus(service.getServiceType(), service.getHost(), service.getPath(), true, true);
2490 logger.info("Service {} is back online", service);
2491 } catch (ServiceRegistryException e) {
2492 logger.warn("Error setting online status for {}", service);
2493 }
2494 }
2495 continue;
2496 default:
2497 if (!service.isOnline()) {
2498 continue;
2499 }
2500 logger.warn("Service {} is not working as expected: {}", service, response.getStatusLine());
2501 }
2502 } else {
2503 logger.warn("Service {} does not respond", service);
2504 }
2505 } catch (TrustedHttpClientException e) {
2506 if (!service.isOnline()) {
2507 continue;
2508 }
2509 logger.warn("Unable to reach {}", service, e);
2510 }
2511
2512
2513 try {
2514 if (unresponsive.contains(service)) {
2515 unRegisterService(service.getServiceType(), service.getHost());
2516 unresponsive.remove(service);
2517 logger.warn("Marking {} as offline", service);
2518 } else {
2519 unresponsive.add(service);
2520 logger.warn("Added {} to the watch list", service);
2521 }
2522 } catch (ServiceRegistryException e) {
2523 logger.warn("Unable to unregister unreachable service: {}", service, e);
2524 }
2525 } finally {
2526 client.close(response);
2527 }
2528 }
2529 } catch (Throwable t) {
2530 logger.warn("Error while checking for unresponsive services", t);
2531 }
2532
2533 logger.debug("Finished checking for unresponsive services");
2534 }
2535 }
2536
2537
2538
2539
2540
2541
2542 private class LoadComparator implements Comparator<ServiceRegistration> {
2543
2544 protected SystemLoad loadByHost = null;
2545
2546
2547
2548
2549
2550
2551
2552 LoadComparator(SystemLoad loadByHost) {
2553 this.loadByHost = loadByHost;
2554 }
2555
2556 @Override
2557 public int compare(ServiceRegistration serviceA, ServiceRegistration serviceB) {
2558 String hostA = serviceA.getHost();
2559 String hostB = serviceB.getHost();
2560 NodeLoad nodeA = loadByHost.get(hostA);
2561 NodeLoad nodeB = loadByHost.get(hostB);
2562
2563 if (Math.abs(nodeA.getLoadFactor() - nodeB.getLoadFactor()) <= 0.01) {
2564
2565
2566
2567 return Float.compare(nodeB.getMaxLoad(), nodeA.getMaxLoad());
2568 }
2569 return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2570 }
2571 }
2572
2573
2574
2575
2576
2577
2578
2579
2580 private class LoadComparatorEncoding extends LoadComparator implements Comparator<ServiceRegistration> {
2581
2582
2583
2584
2585
2586
2587 LoadComparatorEncoding(SystemLoad loadByHost) {
2588 super(loadByHost);
2589 }
2590
2591 @Override
2592 public int compare(ServiceRegistration serviceA, ServiceRegistration serviceB) {
2593 String hostA = serviceA.getHost();
2594 String hostB = serviceB.getHost();
2595 NodeLoad nodeA = loadByHost.get(hostA);
2596 NodeLoad nodeB = loadByHost.get(hostB);
2597
2598 if (encodingWorkers != null) {
2599 if (encodingWorkers.contains(hostA) && !encodingWorkers.contains(hostB)) {
2600 if (nodeA.getLoadFactor() <= encodingThreshold) {
2601 return -1;
2602 }
2603 return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2604 }
2605 if (encodingWorkers.contains(hostB) && !encodingWorkers.contains(hostA)) {
2606 if (nodeB.getLoadFactor() <= encodingThreshold) {
2607 return 1;
2608 }
2609 return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
2610 }
2611 }
2612 return super.compare(serviceA, serviceB);
2613 }
2614 }
2615 }