1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.serviceregistry.api;
23
24 import org.opencastproject.job.api.JaxbJob;
25 import org.opencastproject.job.api.Job;
26 import org.opencastproject.job.api.Job.Status;
27 import org.opencastproject.job.api.JobImpl;
28 import org.opencastproject.job.api.JobParser;
29 import org.opencastproject.job.api.JobProducer;
30 import org.opencastproject.security.api.Organization;
31 import org.opencastproject.security.api.OrganizationDirectoryService;
32 import org.opencastproject.security.api.SecurityService;
33 import org.opencastproject.security.api.User;
34 import org.opencastproject.security.api.UserDirectoryService;
35 import org.opencastproject.serviceregistry.api.SystemLoad.NodeLoad;
36 import org.opencastproject.util.NotFoundException;
37
38 import org.apache.commons.lang3.NotImplementedException;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import java.io.IOException;
43 import java.util.ArrayList;
44 import java.util.Collections;
45 import java.util.Comparator;
46 import java.util.Date;
47 import java.util.HashMap;
48 import java.util.HashSet;
49 import java.util.Iterator;
50 import java.util.LinkedHashSet;
51 import java.util.LinkedList;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.Map.Entry;
55 import java.util.Set;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.ScheduledExecutorService;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.atomic.AtomicInteger;
60 import java.util.concurrent.atomic.AtomicLong;
61
62
63 public class ServiceRegistryInMemoryImpl implements ServiceRegistry {
64
65
66 private static final Logger logger = LoggerFactory.getLogger(ServiceRegistryInMemoryImpl.class);
67
68
69 public static final long DEFAULT_DISPATCHER_TIMEOUT = 100;
70
71
72 private static final String LOCALHOST = "localhost";
73
74
75 protected Map<String, HostRegistrationInMemory> hosts = new HashMap<String, HostRegistrationInMemory>();
76
77
78 protected Map<String, List<ServiceRegistrationInMemoryImpl>> services = new HashMap<String, List<ServiceRegistrationInMemoryImpl>>();
79
80
81 protected Map<Long, String> jobs = new HashMap<Long, String>();
82
83
84 protected Map<ServiceRegistrationInMemoryImpl, Set<Job>> jobHosts = new HashMap<ServiceRegistrationInMemoryImpl, Set<Job>>();
85
86
87 protected ScheduledExecutorService dispatcher = Executors.newScheduledThreadPool(1);
88
89
90 protected AtomicLong idCounter = new AtomicLong();
91
92
93 protected Job currentJob = null;
94
95
96
97
98
99 protected SecurityService securityService = null;
100
101
102 protected UserDirectoryService userDirectoryService = null;
103
104
105 protected OrganizationDirectoryService organizationDirectoryService = null;
106
107 protected Incidents incidents;
108
109
110
111
112 protected static final List<Status> JOB_STATUSES_INFLUENCING_LOAD_BALANCING;
113
114 static {
115 JOB_STATUSES_INFLUENCING_LOAD_BALANCING = new ArrayList<Status>();
116 JOB_STATUSES_INFLUENCING_LOAD_BALANCING.add(Status.QUEUED);
117 JOB_STATUSES_INFLUENCING_LOAD_BALANCING.add(Status.RUNNING);
118 }
119
120 public ServiceRegistryInMemoryImpl(JobProducer service, float maxLoad, SecurityService securityService,
121 UserDirectoryService userDirectoryService, OrganizationDirectoryService organizationDirectoryService,
122 IncidentService incidentService) throws ServiceRegistryException {
123
124 registerHost(LOCALHOST, LOCALHOST, "Admin", Runtime.getRuntime().totalMemory(), Runtime.getRuntime().availableProcessors(), maxLoad);
125 if (service != null)
126 registerService(service, maxLoad);
127 this.securityService = securityService;
128 this.userDirectoryService = userDirectoryService;
129 this.organizationDirectoryService = organizationDirectoryService;
130 this.incidents = new Incidents(this, incidentService);
131 this.dispatcher.scheduleWithFixedDelay(new JobDispatcher(), DEFAULT_DISPATCHER_TIMEOUT, DEFAULT_DISPATCHER_TIMEOUT,
132 TimeUnit.MILLISECONDS);
133 }
134
135 public ServiceRegistryInMemoryImpl(JobProducer service, SecurityService securityService,
136 UserDirectoryService userDirectoryService, OrganizationDirectoryService organizationDirectoryService,
137 IncidentService incidentService)
138 throws ServiceRegistryException {
139 this(service, Runtime.getRuntime().availableProcessors(), securityService, userDirectoryService, organizationDirectoryService, incidentService);
140 }
141
142
143
144
145 public void dispose() {
146 if (dispatcher != null) {
147 try {
148 dispatcher.shutdownNow();
149 if (!dispatcher.isShutdown()) {
150 logger.info("Waiting for Dispatcher to terminate");
151 dispatcher.awaitTermination(10, TimeUnit.SECONDS);
152 }
153 } catch (InterruptedException e) {
154 logger.error("Error shutting down the Dispatcher", e);
155 }
156 }
157 }
158
159
160
161
162
163
164 @Override
165 public void enableHost(String host) throws ServiceRegistryException, NotFoundException {
166 if (hosts.containsKey(host)) {
167 hosts.get(host).setActive(true);
168 } else {
169 throw new NotFoundException("The host named " + host + " was not found");
170 }
171 }
172
173
174
175
176
177
178 @Override
179 public void disableHost(String host) throws ServiceRegistryException, NotFoundException {
180 if (hosts.containsKey(host)) {
181 hosts.get(host).setActive(false);
182 } else {
183 throw new NotFoundException("The host named " + host + " was not found");
184 }
185 }
186
187
188
189
190
191
192 @Override
193 public void registerHost(String host, String address, String nodeName, long memory, int cores, float maxLoad)
194 throws ServiceRegistryException {
195 HostRegistrationInMemory hrim = new HostRegistrationInMemory(address, address, nodeName, maxLoad, cores, memory);
196 hosts.put(host, hrim);
197 }
198
199
200
201
202
203
204 @Override
205 public void unregisterHost(String host) throws ServiceRegistryException {
206 hosts.remove(host);
207 services.remove(host);
208 }
209
210
211
212
213
214
215
216
217
218 public ServiceRegistration registerService(JobProducer localService) throws ServiceRegistryException {
219 return registerService(localService, Runtime.getRuntime().availableProcessors());
220 }
221
222
223
224
225
226
227
228
229
230
231
232 public ServiceRegistration registerService(JobProducer localService, float maxLoad) throws ServiceRegistryException {
233 HostRegistrationInMemory hrim = hosts.get(LOCALHOST);
234
235 List<ServiceRegistrationInMemoryImpl> servicesOnHost = services.get(LOCALHOST);
236 if (servicesOnHost == null) {
237 servicesOnHost = new ArrayList<ServiceRegistrationInMemoryImpl>();
238 services.put(LOCALHOST, servicesOnHost);
239 }
240
241 ServiceRegistrationInMemoryImpl registration = new ServiceRegistrationInMemoryImpl(localService, hrim.getBaseUrl());
242 registration.setMaintenance(false);
243 servicesOnHost.add(registration);
244 return registration;
245 }
246
247
248
249
250
251
252
253
254
255 public void unregisterService(JobProducer localService) throws ServiceRegistryException {
256 List<ServiceRegistrationInMemoryImpl> servicesOnHost = services.get(LOCALHOST);
257 if (servicesOnHost != null) {
258 ServiceRegistrationInMemoryImpl s = (ServiceRegistrationInMemoryImpl) localService;
259 servicesOnHost.remove(s);
260 }
261 }
262
263
264
265
266
267
268
269 @Override
270 public ServiceRegistration registerService(String serviceType, String host, String path)
271 throws ServiceRegistryException {
272 return registerService(serviceType, host, path, false);
273 }
274
275
276
277
278
279
280
281 @Override
282 public ServiceRegistration registerService(String serviceType, String host, String path, boolean jobProducer)
283 throws ServiceRegistryException {
284
285 HostRegistrationInMemory hostRegistration = hosts.get(host);
286 if (hostRegistration == null) {
287 throw new ServiceRegistryException(new NotFoundException("Host " + host + " was not found"));
288 }
289
290 List<ServiceRegistrationInMemoryImpl> servicesOnHost = services.get(host);
291 if (servicesOnHost == null) {
292 servicesOnHost = new ArrayList<ServiceRegistrationInMemoryImpl>();
293 services.put(host, servicesOnHost);
294 }
295
296 ServiceRegistrationInMemoryImpl registration = new ServiceRegistrationInMemoryImpl(serviceType, host, path,
297 jobProducer);
298 servicesOnHost.add(registration);
299 return registration;
300 }
301
302
303
304
305
306
307 @Override
308 public void unRegisterService(String serviceType, String host) throws ServiceRegistryException {
309 List<ServiceRegistrationInMemoryImpl> servicesOnHost = services.get(host);
310 if (servicesOnHost != null) {
311 Iterator<ServiceRegistrationInMemoryImpl> ri = servicesOnHost.iterator();
312 while (ri.hasNext()) {
313 ServiceRegistration registration = ri.next();
314 if (serviceType.equals(registration.getServiceType()))
315 ri.remove();
316 }
317 }
318 }
319
320
321
322
323
324
325 @Override
326 public void setMaintenanceStatus(String host, boolean maintenance) throws NotFoundException {
327 List<ServiceRegistrationInMemoryImpl> servicesOnHost = services.get(host);
328 if (!hosts.containsKey(host)) {
329 throw new NotFoundException("Host " + host + " was not found");
330 }
331 hosts.get(host).setMaintenanceMode(maintenance);
332 if (servicesOnHost != null) {
333 for (ServiceRegistrationInMemoryImpl r : servicesOnHost) {
334 r.setMaintenance(maintenance);
335 }
336 }
337 }
338
339
340
341
342
343
344 @Override
345 public Job createJob(String type, String operation) throws ServiceRegistryException {
346 return createJob(type, operation, null, null, true);
347 }
348
349
350
351
352
353
354
355 @Override
356 public Job createJob(String type, String operation, List<String> arguments) throws ServiceRegistryException {
357 return createJob(type, operation, arguments, null, true);
358 }
359
360
361
362
363
364
365
366 @Override
367 public Job createJob(String type, String operation, List<String> arguments, Float jobLoad)
368 throws ServiceRegistryException {
369 return createJob(type, operation, arguments, null, true, jobLoad);
370 }
371
372 public Job createJob(String type, String operation, List<String> arguments, String payload)
373 throws ServiceRegistryException {
374 return createJob(type, operation, arguments, payload, true);
375 }
376
377
378
379
380
381
382
383 @Override
384 public Job createJob(String type, String operation, List<String> arguments, String payload, boolean queueable)
385 throws ServiceRegistryException {
386 return createJob(type, operation, arguments, payload, queueable, null, 1.0f);
387 }
388
389
390
391
392
393
394
395 @Override
396 public Job createJob(String type, String operation, List<String> arguments, String payload, boolean queueable,
397 Float jobLoad) throws ServiceRegistryException {
398 return createJob(type, operation, arguments, payload, queueable, null, jobLoad);
399 }
400
401
402
403
404
405
406
407 @Override
408 public Job createJob(String type, String operation, List<String> arguments, String payload, boolean queueable,
409 Job parentJob, Float jobLoad) throws ServiceRegistryException {
410 if (getServiceRegistrationsByType(type).size() == 0)
411 logger.warn("Service " + type + " not available");
412
413 Job job = null;
414 synchronized (this) {
415 job = new JobImpl(idCounter.addAndGet(1));
416 if (securityService != null) {
417 job.setCreator(securityService.getUser().getUsername());
418 job.setOrganization(securityService.getOrganization().getId());
419 }
420 job.setDateCreated(new Date());
421 job.setJobType(type);
422 job.setOperation(operation);
423 job.setArguments(arguments);
424 job.setPayload(payload);
425 if (queueable)
426 job.setStatus(Status.QUEUED);
427 else
428 job.setStatus(Status.INSTANTIATED);
429 if (parentJob != null)
430 job.setParentJobId(parentJob.getId());
431 job.setJobLoad(jobLoad);
432 }
433
434 synchronized (jobs) {
435 try {
436 jobs.put(job.getId(), JobParser.toXml(new JaxbJob(job)));
437 } catch (IOException e) {
438 throw new IllegalStateException("Error serializing job " + job, e);
439 }
440 }
441 return job;
442 }
443
444 private void removeJob(long id) throws NotFoundException, ServiceRegistryException {
445 synchronized (jobs) {
446 if (!jobs.containsKey(id))
447 throw new NotFoundException("No job with ID '" + id + "' found");
448
449 jobs.remove(id);
450 }
451 }
452
453 @Override
454 public void removeJobs(List<Long> ids) throws NotFoundException, ServiceRegistryException {
455 synchronized (jobs) {
456 for (long id : ids) {
457 removeJob(id);
458 }
459 }
460 }
461
462
463
464
465
466
467
468
469
470
471
472
473
474 protected boolean dispatchJob(Job job) throws ServiceUnavailableException, ServiceRegistryException,
475 UndispatchableJobException {
476 List<ServiceRegistration> registrations = getServiceRegistrationsByLoad(job.getJobType());
477 if (registrations.size() == 0)
478 throw new ServiceUnavailableException("No service is available to handle jobs of type '" + job.getJobType() + "'");
479 job.setStatus(Status.DISPATCHING);
480 try {
481 job = updateJob(job);
482 } catch (NotFoundException e) {
483 throw new ServiceRegistryException("Job not found!", e);
484 }
485 for (ServiceRegistration registration : registrations) {
486 if (registration.isJobProducer() && !registration.isInMaintenanceMode()) {
487 ServiceRegistrationInMemoryImpl inMemoryRegistration = (ServiceRegistrationInMemoryImpl) registration;
488 JobProducer service = inMemoryRegistration.getService();
489
490
491
492 Set<Job> jobs = jobHosts.get(inMemoryRegistration);
493 if (jobs == null) {
494 jobs = new LinkedHashSet<Job>();
495 }
496 jobs.add(job);
497 jobHosts.put(inMemoryRegistration, jobs);
498
499 if (!service.isReadyToAcceptJobs(job.getOperation())) {
500 jobs.remove(job);
501 jobHosts.put(inMemoryRegistration, jobs);
502 continue;
503 }
504 if (!service.isReadyToAccept(job)) {
505 jobs.remove(job);
506 jobHosts.put(inMemoryRegistration, jobs);
507 continue;
508 }
509 try {
510 job = updateJob(job);
511 } catch (NotFoundException e) {
512 jobs.remove(job);
513 jobHosts.put(inMemoryRegistration, jobs);
514 throw new ServiceRegistryException("Job not found!", e);
515 }
516 service.acceptJob(job);
517 return true;
518 } else if (!registration.isJobProducer()) {
519 logger.warn("This implementation of the service registry doesn't support dispatching to remote services");
520
521 } else {
522 logger.warn("Service " + registration + " is in maintenance mode");
523 }
524 }
525 return false;
526 }
527
528
529
530
531
532
533 @Override
534 public Job updateJob(Job job) throws NotFoundException, ServiceRegistryException {
535 if (job == null)
536 throw new IllegalArgumentException("Job cannot be null");
537 Job updatedJob = null;
538 synchronized (jobs) {
539 try {
540 updatedJob = updateInternal(job);
541 jobs.put(updatedJob.getId(), JobParser.toXml(new JaxbJob(updatedJob)));
542 } catch (IOException e) {
543 throw new IllegalStateException("Error serializing job", e);
544 }
545 }
546 return updatedJob;
547 }
548
549 private Job updateInternal(Job job) {
550 Date now = new Date();
551 Status status = job.getStatus();
552 if (job.getDateCreated() == null) {
553 job.setDateCreated(now);
554 }
555 if (Status.RUNNING.equals(status)) {
556 if (job.getDateStarted() == null) {
557 job.setDateStarted(now);
558 job.setQueueTime(now.getTime() - job.getDateCreated().getTime());
559 }
560 } else if (Status.FAILED.equals(status)) {
561
562 job.setDateCompleted(now);
563 if (job.getDateStarted() != null) {
564 job.setRunTime(now.getTime() - job.getDateStarted().getTime());
565 }
566 } else if (Status.FINISHED.equals(status)) {
567 if (job.getDateStarted() == null) {
568
569
570 job.setDateStarted(job.getDateCreated());
571 }
572 job.setDateCompleted(now);
573 job.setRunTime(now.getTime() - job.getDateStarted().getTime());
574
575
576 for (Entry<String, List<ServiceRegistrationInMemoryImpl>> service : services.entrySet()) {
577 for (ServiceRegistrationInMemoryImpl srv : service.getValue()) {
578 Set<Job> jobs = jobHosts.get(srv);
579 if (jobs != null) {
580 Set<Job> updatedJobs = new HashSet<>();
581 for (Job savedJob : jobs) {
582 if (savedJob.getId() != job.getId())
583 updatedJobs.add(savedJob);
584 }
585 jobHosts.put(srv, updatedJobs);
586 }
587 }
588 }
589 }
590 return job;
591 }
592
593
594
595
596
597
598 @Override
599 public Job getJob(long id) throws NotFoundException, ServiceRegistryException {
600 synchronized (jobs) {
601 String serializedJob = jobs.get(id);
602 if (serializedJob == null)
603 throw new NotFoundException(Long.toString(id));
604 try {
605 return JobParser.parseJob(serializedJob);
606 } catch (IOException e) {
607 throw new IllegalStateException("Error unmarshaling job", e);
608 }
609 }
610 }
611
612
613
614
615
616
617 @Override
618 public List<Job> getChildJobs(long id) throws ServiceRegistryException {
619 List<Job> result = new ArrayList<Job>();
620 synchronized (jobs) {
621 for (String serializedJob : jobs.values()) {
622 Job job = null;
623 try {
624 job = JobParser.parseJob(serializedJob);
625 } catch (IOException e) {
626 throw new IllegalStateException("Error unmarshaling job", e);
627 }
628 if (job.getParentJobId() == null)
629 continue;
630 if (job.getParentJobId().equals(id) || job.getRootJobId().equals(id))
631 result.add(job);
632
633 Long parentJobId = job.getParentJobId();
634 while (parentJobId != null && parentJobId > 0) {
635 try {
636 Job parentJob = getJob(job.getParentJobId());
637 if (parentJob.getParentJobId().equals(id)) {
638 result.add(job);
639 break;
640 }
641 parentJobId = parentJob.getParentJobId();
642 } catch (NotFoundException e) {
643 throw new ServiceRegistryException("Job from parent job id was not found!", e);
644 }
645 }
646 }
647 }
648 Collections.sort(result, new Comparator<Job>() {
649 @Override
650 public int compare(Job job1, Job job2) {
651 return job1.getDateCreated().compareTo(job1.getDateCreated());
652 }
653 });
654 return result;
655 }
656
657
658
659
660
661
662
663 @Override
664 public List<Job> getJobs(String serviceType, Status status) throws ServiceRegistryException {
665 List<Job> result = new ArrayList<Job>();
666 synchronized (jobs) {
667 for (String serializedJob : jobs.values()) {
668 Job job = null;
669 try {
670 job = JobParser.parseJob(serializedJob);
671 } catch (IOException e) {
672 throw new IllegalStateException("Error unmarshaling job", e);
673 }
674 if (serviceType.equals(job.getJobType()) && status.equals(job.getStatus()))
675 result.add(job);
676 }
677 }
678 return result;
679 }
680
681 @Override
682 public List<String> getJobPayloads(String operation) throws ServiceRegistryException {
683 List<String> result = new ArrayList<>();
684 for (String serializedJob : jobs.values()) {
685 try {
686 Job job = JobParser.parseJob(serializedJob);
687 if (operation.equals(job.getOperation())) {
688 result.add(job.getPayload());
689 }
690 } catch (IOException e) {
691 throw new IllegalStateException("Error unmarshaling job", e);
692 }
693 }
694 return result;
695 }
696
697 @Override
698 public List<String> getJobPayloads(String operation, int limit, int offset) throws ServiceRegistryException {
699 return null;
700 }
701
702 @Override
703 public int getJobCount(String operation) throws ServiceRegistryException {
704 return 0;
705 }
706
707
708
709
710
711
712 @Override
713 public List<Job> getActiveJobs() throws ServiceRegistryException {
714 List<Job> result = new ArrayList<Job>();
715 synchronized (jobs) {
716 for (String serializedJob : jobs.values()) {
717 Job job = null;
718 try {
719 job = JobParser.parseJob(serializedJob);
720 } catch (IOException e) {
721 throw new IllegalStateException("Error unmarshaling job", e);
722 }
723 if (job.getStatus().isActive())
724 result.add(job);
725 }
726 }
727 return result;
728 }
729
730 @Override
731 public Incidents incident() {
732 return incidents;
733 }
734
735
736
737
738
739
740 @Override
741 public List<ServiceRegistration> getServiceRegistrationsByLoad(String serviceType) throws ServiceRegistryException {
742 return getServiceRegistrationsByType(serviceType);
743 }
744
745
746
747
748
749
750 @Override
751 public List<ServiceRegistration> getServiceRegistrationsByType(String serviceType) throws ServiceRegistryException {
752 List<ServiceRegistration> result = new ArrayList<ServiceRegistration>();
753 for (List<ServiceRegistrationInMemoryImpl> servicesPerHost : services.values()) {
754 for (ServiceRegistrationInMemoryImpl r : servicesPerHost) {
755 if (serviceType.equals(r.getServiceType()))
756 result.add(r);
757 }
758 }
759 return result;
760 }
761
762
763
764
765
766
767 @Override
768 public List<ServiceRegistration> getServiceRegistrationsByHost(String host) throws ServiceRegistryException {
769 List<ServiceRegistration> result = new ArrayList<ServiceRegistration>();
770 List<ServiceRegistrationInMemoryImpl> servicesPerHost = services.get(host);
771 if (servicesPerHost != null) {
772 result.addAll(servicesPerHost);
773 }
774 return result;
775 }
776
777
778
779
780
781
782
783 @Override
784 public ServiceRegistration getServiceRegistration(String serviceType, String host) throws ServiceRegistryException {
785 List<ServiceRegistrationInMemoryImpl> servicesPerHost = services.get(host);
786 if (servicesPerHost != null) {
787 for (ServiceRegistrationInMemoryImpl r : servicesPerHost) {
788 if (serviceType.equals(r.getServiceType()))
789 return r;
790 }
791 }
792 return null;
793 }
794
795
796
797
798
799
800 @Override
801 public List<ServiceRegistration> getServiceRegistrations() throws ServiceRegistryException {
802 List<ServiceRegistration> result = new ArrayList<ServiceRegistration>();
803 for (List<ServiceRegistrationInMemoryImpl> servicesPerHost : services.values()) {
804 result.addAll(servicesPerHost);
805 }
806 return result;
807 }
808
809
810
811
812
813
814 @Override
815 public List<ServiceStatistics> getServiceStatistics() throws ServiceRegistryException {
816 throw new UnsupportedOperationException("Operation not yet implemented");
817 }
818
819
820
821
822
823
824
825 @Override
826 public long count(String serviceType, Status status) throws ServiceRegistryException {
827 return count(serviceType, null, null, status);
828 }
829
830
831
832
833
834
835
836 @Override
837 public long countByOperation(String serviceType, String operation, Status status) throws ServiceRegistryException {
838 return count(serviceType, null, operation, status);
839 }
840
841
842
843
844
845
846
847 @Override
848 public long countByHost(String serviceType, String host, Status status) throws ServiceRegistryException {
849 return count(serviceType, host, null, status);
850 }
851
852
853
854
855
856
857
858 @Override
859 public long count(String serviceType, String host, String operation, Status status) throws ServiceRegistryException {
860 int count = 0;
861 synchronized (jobs) {
862 for (String serializedJob : jobs.values()) {
863 Job job = null;
864 try {
865 job = JobParser.parseJob(serializedJob);
866 } catch (IOException e) {
867 throw new IllegalStateException("Error unmarshaling job", e);
868 }
869 if (serviceType != null && !serviceType.equals(job.getJobType()))
870 continue;
871 if (host != null && !host.equals(job.getProcessingHost()))
872 continue;
873 if (operation != null && !operation.equals(job.getOperation()))
874 continue;
875 if (status != null && !status.equals(job.getStatus()))
876 continue;
877 count++;
878 }
879 }
880 return count;
881 }
882
883
884
885
886
887 class JobDispatcher implements Runnable {
888
889
890
891
892
893
894 @Override
895 public void run() {
896
897
898 synchronized (jobs) {
899 for (String serializedJob : jobs.values()) {
900 Job job = null;
901 try {
902 job = JobParser.parseJob(serializedJob);
903 User creator = userDirectoryService.loadUser(job.getCreator());
904 Organization organization = organizationDirectoryService.getOrganization(job.getOrganization());
905 securityService.setUser(creator);
906 securityService.setOrganization(organization);
907 if (Status.QUEUED.equals(job.getStatus())) {
908 job.setStatus(Status.DISPATCHING);
909 if (!dispatchJob(job)) {
910 job.setStatus(Status.QUEUED);
911 }
912 }
913 } catch (ServiceUnavailableException e) {
914 job.setStatus(Status.FAILED);
915 Throwable cause = (e.getCause() != null) ? e.getCause() : e;
916 logger.error("Unable to find a service for job " + job, cause);
917 } catch (ServiceRegistryException e) {
918 job.setStatus(Status.FAILED);
919 Throwable cause = (e.getCause() != null) ? e.getCause() : e;
920 logger.error("Error dispatching job " + job, cause);
921 } catch (IOException e) {
922 throw new IllegalStateException("Error unmarshaling job", e);
923 } catch (NotFoundException e) {
924 throw new IllegalStateException("Creator organization not found", e);
925 } catch (Throwable e) {
926 logger.error("Error dispatching job " + job, e);
927 } finally {
928 try {
929 jobs.put(job.getId(), JobParser.toXml(new JaxbJob(job)));
930 } catch (IOException e) {
931 throw new IllegalStateException("Error unmarshaling job", e);
932 }
933 securityService.setUser(null);
934 securityService.setOrganization(null);
935 }
936 }
937 }
938 }
939 }
940
941
942 public void deactivate() {
943 dispatcher.shutdownNow();
944 Map<Status, AtomicInteger> counts = new HashMap<Job.Status, AtomicInteger>();
945 synchronized (jobs) {
946 for (String serializedJob : jobs.values()) {
947 Job job = null;
948 try {
949 job = JobParser.parseJob(serializedJob);
950 } catch (IOException e) {
951 throw new IllegalStateException("Error unmarshaling job", e);
952 }
953 if (counts.containsKey(job.getStatus())) {
954 counts.get(job.getStatus()).incrementAndGet();
955 } else {
956 counts.put(job.getStatus(), new AtomicInteger(1));
957 }
958 }
959 }
960 StringBuilder sb = new StringBuilder("Abandoned:");
961 for (Entry<Status, AtomicInteger> entry : counts.entrySet()) {
962 sb.append(" " + entry.getValue() + " " + entry.getKey() + " jobs");
963 }
964 logger.info(sb.toString());
965 }
966
967
968
969
970
971
972 @Override
973 public SystemLoad getMaxLoads() throws ServiceRegistryException {
974 SystemLoad systemLoad = new SystemLoad();
975 systemLoad.addNodeLoad(new NodeLoad(LOCALHOST, 0.0f, Runtime.getRuntime().availableProcessors()));
976 return systemLoad;
977 }
978
979
980
981
982
983
984 @Override
985 public NodeLoad getMaxLoadOnNode(String host) throws ServiceRegistryException {
986 if (hosts.containsKey(host)) {
987 return new NodeLoad(host, 0.0f, hosts.get(host).getMaxLoad());
988 }
989 throw new ServiceRegistryException("Unable to find host " + host + " in service registry");
990 }
991
992
993
994
995
996
997
998 public void setSecurityService(SecurityService securityService) {
999 this.securityService = securityService;
1000 }
1001
1002 @Override
1003 public void sanitize(String serviceType, String host) {
1004
1005 }
1006
1007 @Override
1008 public Job getCurrentJob() {
1009 return this.currentJob;
1010 }
1011
1012 @Override
1013 public void setCurrentJob(Job job) {
1014 this.currentJob = job;
1015 }
1016
1017 @Override
1018 public List<HostRegistration> getHostRegistrations() throws ServiceRegistryException {
1019 List<HostRegistration> hostList = new LinkedList<HostRegistration>();
1020 hostList.addAll(hosts.values());
1021 return hostList;
1022 }
1023
1024 @Override
1025 public HostStatistics getHostStatistics() {
1026 HostStatistics statistics = new HostStatistics();
1027 for (Map.Entry<ServiceRegistrationInMemoryImpl, Set<Job>> entry: jobHosts.entrySet()) {
1028 final ServiceRegistrationInMemoryImpl service = entry.getKey();
1029 final long queued = entry.getValue().stream().filter(job -> job.getStatus() == Status.QUEUED).count();
1030 final long running = entry.getValue().stream().filter(job -> job.getStatus() == Status.RUNNING).count();
1031 final long host = service.host.hashCode();
1032 statistics.addQueued(host, statistics.queuedJobs(host) + queued);
1033 statistics.addRunning(host, statistics.runningJobs(host) + running);
1034 }
1035 return statistics;
1036 }
1037
1038 @Override
1039 public HostRegistration getHostRegistration(String hostname) throws ServiceRegistryException {
1040 for (HostRegistration host: this.getHostRegistrations()) {
1041 if (host.getBaseUrl().equalsIgnoreCase(hostname)) {
1042 return host;
1043 }
1044 }
1045 throw new ServiceRegistryException(String.format("Host registration for %s not found", hostname));
1046 }
1047
1048 @Override
1049 public SystemLoad getCurrentHostLoads() {
1050 SystemLoad systemLoad = new SystemLoad();
1051
1052 for (String host : hosts.keySet()) {
1053 NodeLoad node = new NodeLoad();
1054 node.setHost(host);
1055 for (ServiceRegistration service : services.get(host)) {
1056 if (service.isInMaintenanceMode() || !service.isOnline()) {
1057 continue;
1058 }
1059 Set<Job> hostJobs = jobHosts.get(service);
1060 float loadSum = 0.0f;
1061 if (hostJobs != null) {
1062 for (Job job : hostJobs) {
1063 if (job.getStatus() != null && JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(job.getStatus())) {
1064 loadSum += job.getJobLoad();
1065 }
1066 }
1067 }
1068 node.setCurrentLoad(loadSum);
1069 }
1070 systemLoad.addNodeLoad(node);
1071 }
1072 return systemLoad;
1073 }
1074
1075 @Override
1076 public void removeParentlessJobs(int lifetime) throws ServiceRegistryException {
1077 synchronized (jobs) {
1078 for (String serializedJob : jobs.values()) {
1079 Job job = null;
1080 try {
1081 job = JobParser.parseJob(serializedJob);
1082 } catch (IOException e) {
1083 throw new IllegalStateException("Error unmarshaling job", e);
1084 }
1085
1086 Long parentJobId = job.getParentJobId();
1087 if (parentJobId == null || parentJobId < 1)
1088 jobs.remove(job.getId());
1089 }
1090 }
1091 }
1092
1093 @Override
1094 public Map<String, Map<String, Long>> countActiveByOrganizationAndHost() {
1095 var hostMap = new HashMap<String, Long>();
1096 for (var entry: jobHosts.entrySet()) {
1097 var host = entry.getKey().host;
1098 var count = entry.getValue().size();
1099 hostMap.put(host, (long) count);
1100 }
1101 return Map.of("mh_dafault_org", hostMap);
1102 }
1103
1104 @Override
1105 public Map<String, Long> countActiveTypeByOrganization(String operation) {
1106 throw new NotImplementedException("This has not been implemented");
1107 }
1108
1109 @Override
1110 public float getOwnLoad() {
1111 return getCurrentHostLoads().get(getRegistryHostname()).getCurrentLoad();
1112 }
1113
1114 @Override
1115 public String getRegistryHostname() {
1116 return LOCALHOST;
1117 }
1118
1119 }