View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.serviceregistry.api;
23  
24  import org.opencastproject.job.api.JaxbJob;
25  import org.opencastproject.job.api.Job;
26  import org.opencastproject.job.api.Job.Status;
27  import org.opencastproject.job.api.JobImpl;
28  import org.opencastproject.job.api.JobParser;
29  import org.opencastproject.job.api.JobProducer;
30  import org.opencastproject.security.api.Organization;
31  import org.opencastproject.security.api.OrganizationDirectoryService;
32  import org.opencastproject.security.api.SecurityService;
33  import org.opencastproject.security.api.User;
34  import org.opencastproject.security.api.UserDirectoryService;
35  import org.opencastproject.serviceregistry.api.SystemLoad.NodeLoad;
36  import org.opencastproject.util.NotFoundException;
37  
38  import org.apache.commons.lang3.NotImplementedException;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  import java.io.IOException;
43  import java.util.ArrayList;
44  import java.util.Collections;
45  import java.util.Comparator;
46  import java.util.Date;
47  import java.util.HashMap;
48  import java.util.HashSet;
49  import java.util.Iterator;
50  import java.util.LinkedHashSet;
51  import java.util.LinkedList;
52  import java.util.List;
53  import java.util.Map;
54  import java.util.Map.Entry;
55  import java.util.Set;
56  import java.util.concurrent.Executors;
57  import java.util.concurrent.ScheduledExecutorService;
58  import java.util.concurrent.TimeUnit;
59  import java.util.concurrent.atomic.AtomicInteger;
60  import java.util.concurrent.atomic.AtomicLong;
61  
62  /** Simple and in-memory implementation of a the service registry intended for testing scenarios. */
63  public class ServiceRegistryInMemoryImpl implements ServiceRegistry {
64  
65    /** Logging facility */
66    private static final Logger logger = LoggerFactory.getLogger(ServiceRegistryInMemoryImpl.class);
67  
68    /** Default dispatcher timeout (1 second) */
69    public static final long DEFAULT_DISPATCHER_TIMEOUT = 100;
70  
71    /** Hostname for localhost */
72    private static final String LOCALHOST = "localhost";
73  
74    /** The hosts */
75    protected Map<String, HostRegistrationInMemory> hosts = new HashMap<String, HostRegistrationInMemory>();
76  
77    /** The service registrations */
78    protected Map<String, List<ServiceRegistrationInMemoryImpl>> services = new HashMap<String, List<ServiceRegistrationInMemoryImpl>>();
79  
80    /** The serialized jobs */
81    protected Map<Long, String> jobs = new HashMap<Long, String>();
82  
83    /** A mapping of services to jobs */
84    protected Map<ServiceRegistrationInMemoryImpl, Set<Job>> jobHosts = new HashMap<ServiceRegistrationInMemoryImpl, Set<Job>>();
85  
86    /** The thread pool to use for dispatching queued jobs. */
87    protected ScheduledExecutorService dispatcher = Executors.newScheduledThreadPool(1);
88  
89    /** The job identifier */
90    protected AtomicLong idCounter = new AtomicLong();
91  
92    /** Holds the current running job */
93    protected Job currentJob = null;
94  
95    /**
96     * An (optional) security service. If set to a non-null value, this will be used to obtain the current user when
97     * creating new jobs.
98     */
99    protected SecurityService securityService = null;
100 
101   /** The user directory service */
102   protected UserDirectoryService userDirectoryService = null;
103 
104   /** The organization directory service */
105   protected OrganizationDirectoryService organizationDirectoryService = null;
106 
107   protected Incidents incidents;
108 
109   /**
110    * A static list of statuses that influence how load balancing is calculated
111    */
112   protected static final List<Status> JOB_STATUSES_INFLUENCING_LOAD_BALANCING;
113 
114   static {
115     JOB_STATUSES_INFLUENCING_LOAD_BALANCING = new ArrayList<Status>();
116     JOB_STATUSES_INFLUENCING_LOAD_BALANCING.add(Status.QUEUED);
117     JOB_STATUSES_INFLUENCING_LOAD_BALANCING.add(Status.RUNNING);
118   }
119 
120   public ServiceRegistryInMemoryImpl(JobProducer service, float maxLoad, SecurityService securityService,
121           UserDirectoryService userDirectoryService, OrganizationDirectoryService organizationDirectoryService,
122           IncidentService incidentService) throws ServiceRegistryException {
123     //Note: total memory here isn't really the correct value, but we just need something (preferably non-zero)
124     registerHost(LOCALHOST, LOCALHOST, "Admin", Runtime.getRuntime().totalMemory(), Runtime.getRuntime().availableProcessors(), maxLoad);
125     if (service != null)
126       registerService(service, maxLoad);
127     this.securityService = securityService;
128     this.userDirectoryService = userDirectoryService;
129     this.organizationDirectoryService = organizationDirectoryService;
130     this.incidents = new Incidents(this, incidentService);
131     this.dispatcher.scheduleWithFixedDelay(new JobDispatcher(), DEFAULT_DISPATCHER_TIMEOUT, DEFAULT_DISPATCHER_TIMEOUT,
132             TimeUnit.MILLISECONDS);
133   }
134 
135   public ServiceRegistryInMemoryImpl(JobProducer service, SecurityService securityService,
136           UserDirectoryService userDirectoryService, OrganizationDirectoryService organizationDirectoryService,
137           IncidentService incidentService)
138           throws ServiceRegistryException {
139     this(service, Runtime.getRuntime().availableProcessors(), securityService, userDirectoryService, organizationDirectoryService, incidentService);
140   }
141 
142   /**
143    * This method shuts down the service registry.
144    */
145   public void dispose() {
146     if (dispatcher != null) {
147       try {
148         dispatcher.shutdownNow();
149         if (!dispatcher.isShutdown()) {
150           logger.info("Waiting for Dispatcher to terminate");
151           dispatcher.awaitTermination(10, TimeUnit.SECONDS);
152         }
153       } catch (InterruptedException e) {
154         logger.error("Error shutting down the Dispatcher", e);
155       }
156     }
157   }
158 
159   /**
160    * {@inheritDoc}
161    *
162    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#enableHost(String)
163    */
164   @Override
165   public void enableHost(String host) throws ServiceRegistryException, NotFoundException {
166     if (hosts.containsKey(host)) {
167       hosts.get(host).setActive(true);
168     } else {
169       throw new NotFoundException("The host named " + host + " was not found");
170     }
171   }
172 
173   /**
174    * {@inheritDoc}
175    *
176    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#disableHost(String)
177    */
178   @Override
179   public void disableHost(String host) throws ServiceRegistryException, NotFoundException {
180     if (hosts.containsKey(host)) {
181       hosts.get(host).setActive(false);
182     } else {
183       throw new NotFoundException("The host named " + host + " was not found");
184     }
185   }
186 
187   /**
188    * {@inheritDoc}
189    *
190    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#registerHost(String, String, String, long, int, float)
191    */
192   @Override
193   public void registerHost(String host, String address, String nodeName, long memory, int cores, float maxLoad)
194           throws ServiceRegistryException {
195     HostRegistrationInMemory hrim = new HostRegistrationInMemory(address, address, nodeName, maxLoad, cores, memory);
196     hosts.put(host, hrim);
197   }
198 
199   /**
200    * {@inheritDoc}
201    *
202    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#unregisterHost(java.lang.String)
203    */
204   @Override
205   public void unregisterHost(String host) throws ServiceRegistryException {
206     hosts.remove(host);
207     services.remove(host);
208   }
209 
210   /**
211    * Method to register locally running services.
212    *
213    * @param localService
214    *          the service instance
215    * @return the service registration
216    * @throws ServiceRegistryException
217    */
218   public ServiceRegistration registerService(JobProducer localService) throws ServiceRegistryException {
219     return registerService(localService, Runtime.getRuntime().availableProcessors());
220   }
221 
222   /**
223    * Method to register locally running services.
224    *
225    * @param localService
226    *          the service instance
227    * @param maxLoad
228    *          the maximum load the host can support
229    * @return the service registration
230    * @throws ServiceRegistryException
231    */
232   public ServiceRegistration registerService(JobProducer localService, float maxLoad) throws ServiceRegistryException {
233     HostRegistrationInMemory hrim = hosts.get(LOCALHOST);
234 
235     List<ServiceRegistrationInMemoryImpl> servicesOnHost = services.get(LOCALHOST);
236     if (servicesOnHost == null) {
237       servicesOnHost = new ArrayList<ServiceRegistrationInMemoryImpl>();
238       services.put(LOCALHOST, servicesOnHost);
239     }
240 
241     ServiceRegistrationInMemoryImpl registration = new ServiceRegistrationInMemoryImpl(localService, hrim.getBaseUrl());
242     registration.setMaintenance(false);
243     servicesOnHost.add(registration);
244     return registration;
245   }
246 
247   /**
248    * Removes the job producer from the service registry.
249    *
250    * @param localService
251    *          the service
252    * @throws ServiceRegistryException
253    *           if removing the service fails
254    */
255   public void unregisterService(JobProducer localService) throws ServiceRegistryException {
256     List<ServiceRegistrationInMemoryImpl> servicesOnHost = services.get(LOCALHOST);
257     if (servicesOnHost != null) {
258       ServiceRegistrationInMemoryImpl s = (ServiceRegistrationInMemoryImpl) localService;
259       servicesOnHost.remove(s);
260     }
261   }
262 
263   /**
264    * {@inheritDoc}
265    *
266    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#registerService(java.lang.String, java.lang.String,
267    *      java.lang.String)
268    */
269   @Override
270   public ServiceRegistration registerService(String serviceType, String host, String path)
271           throws ServiceRegistryException {
272     return registerService(serviceType, host, path, false);
273   }
274 
275   /**
276    * {@inheritDoc}
277    *
278    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#registerService(java.lang.String, java.lang.String,
279    *      java.lang.String, boolean)
280    */
281   @Override
282   public ServiceRegistration registerService(String serviceType, String host, String path, boolean jobProducer)
283           throws ServiceRegistryException {
284 
285     HostRegistrationInMemory hostRegistration = hosts.get(host);
286     if (hostRegistration == null) {
287       throw new ServiceRegistryException(new NotFoundException("Host " + host + " was not found"));
288     }
289 
290     List<ServiceRegistrationInMemoryImpl> servicesOnHost = services.get(host);
291     if (servicesOnHost == null) {
292       servicesOnHost = new ArrayList<ServiceRegistrationInMemoryImpl>();
293       services.put(host, servicesOnHost);
294     }
295 
296     ServiceRegistrationInMemoryImpl registration = new ServiceRegistrationInMemoryImpl(serviceType, host, path,
297             jobProducer);
298     servicesOnHost.add(registration);
299     return registration;
300   }
301 
302   /**
303    * {@inheritDoc}
304    *
305    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#unRegisterService(java.lang.String, java.lang.String)
306    */
307   @Override
308   public void unRegisterService(String serviceType, String host) throws ServiceRegistryException {
309     List<ServiceRegistrationInMemoryImpl> servicesOnHost = services.get(host);
310     if (servicesOnHost != null) {
311       Iterator<ServiceRegistrationInMemoryImpl> ri = servicesOnHost.iterator();
312       while (ri.hasNext()) {
313         ServiceRegistration registration = ri.next();
314         if (serviceType.equals(registration.getServiceType()))
315           ri.remove();
316       }
317     }
318   }
319 
320   /**
321    * {@inheritDoc}
322    *
323    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#setMaintenanceStatus(java.lang.String, boolean)
324    */
325   @Override
326   public void setMaintenanceStatus(String host, boolean maintenance) throws NotFoundException {
327     List<ServiceRegistrationInMemoryImpl> servicesOnHost = services.get(host);
328     if (!hosts.containsKey(host)) {
329       throw new NotFoundException("Host " + host + " was not found");
330     }
331     hosts.get(host).setMaintenanceMode(maintenance);
332     if (servicesOnHost != null) {
333       for (ServiceRegistrationInMemoryImpl r : servicesOnHost) {
334         r.setMaintenance(maintenance);
335       }
336     }
337   }
338 
339   /**
340    * {@inheritDoc}
341    *
342    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String)
343    */
344   @Override
345   public Job createJob(String type, String operation) throws ServiceRegistryException {
346     return createJob(type, operation, null, null, true);
347   }
348 
349   /**
350    * {@inheritDoc}
351    *
352    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
353    *      java.util.List)
354    */
355   @Override
356   public Job createJob(String type, String operation, List<String> arguments) throws ServiceRegistryException {
357     return createJob(type, operation, arguments, null, true);
358   }
359 
360   /**
361    * {@inheritDoc}
362    *
363    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
364           java.util.List, Float)
365    */
366   @Override
367   public Job createJob(String type, String operation, List<String> arguments, Float jobLoad)
368           throws ServiceRegistryException {
369     return createJob(type, operation, arguments, null, true, jobLoad);
370   }
371 
372   public Job createJob(String type, String operation, List<String> arguments, String payload)
373           throws ServiceRegistryException {
374     return createJob(type, operation, arguments, payload, true);
375   }
376 
377   /**
378    * {@inheritDoc}
379    *
380    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
381           java.util.List, java.lang.String, boolean)
382    */
383   @Override
384   public Job createJob(String type, String operation, List<String> arguments, String payload, boolean queueable)
385           throws ServiceRegistryException {
386     return createJob(type, operation, arguments, payload, queueable, null, 1.0f);
387   }
388 
389   /**
390    * {@inheritDoc}
391    *
392    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
393           java.util.List, java.lang.String, boolean, Float)
394    */
395   @Override
396   public Job createJob(String type, String operation, List<String> arguments, String payload, boolean queueable,
397           Float jobLoad) throws ServiceRegistryException {
398     return createJob(type, operation, arguments, payload, queueable, null, jobLoad);
399   }
400 
401   /**
402    * {@inheritDoc}
403    *
404    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
405           java.util.List, java.lang.String, boolean, org.opencastproject.job.api.Job, Float)
406    */
407   @Override
408   public Job createJob(String type, String operation, List<String> arguments, String payload, boolean queueable,
409           Job parentJob, Float jobLoad) throws ServiceRegistryException {
410     if (getServiceRegistrationsByType(type).size() == 0)
411       logger.warn("Service " + type + " not available");
412 
413     Job job = null;
414     synchronized (this) {
415       job = new JobImpl(idCounter.addAndGet(1));
416       if (securityService != null) {
417         job.setCreator(securityService.getUser().getUsername());
418         job.setOrganization(securityService.getOrganization().getId());
419       }
420       job.setDateCreated(new Date());
421       job.setJobType(type);
422       job.setOperation(operation);
423       job.setArguments(arguments);
424       job.setPayload(payload);
425       if (queueable)
426         job.setStatus(Status.QUEUED);
427       else
428         job.setStatus(Status.INSTANTIATED);
429       if (parentJob != null)
430         job.setParentJobId(parentJob.getId());
431       job.setJobLoad(jobLoad);
432     }
433 
434     synchronized (jobs) {
435       try {
436         jobs.put(job.getId(), JobParser.toXml(new JaxbJob(job)));
437       } catch (IOException e) {
438         throw new IllegalStateException("Error serializing job " + job, e);
439       }
440     }
441     return job;
442   }
443 
444   private void removeJob(long id) throws NotFoundException, ServiceRegistryException {
445     synchronized (jobs) {
446       if (!jobs.containsKey(id))
447         throw new NotFoundException("No job with ID '" + id + "' found");
448 
449       jobs.remove(id);
450     }
451   }
452 
453   @Override
454   public void removeJobs(List<Long> ids) throws NotFoundException, ServiceRegistryException {
455     synchronized (jobs) {
456       for (long id : ids) {
457         removeJob(id);
458       }
459     }
460   }
461 
462   /**
463    * Dispatches the job to the least loaded service or throws a <code>ServiceUnavailableException</code> if there is no
464    * such service.
465    *
466    * @param job
467    *          the job to dispatch
468    * @return whether the job was dispatched
469    * @throws ServiceUnavailableException
470    *           if no service is available to dispatch the job
471    * @throws ServiceRegistryException
472    *           if the service registrations are unavailable or dispatching of the job fails
473    */
474   protected boolean dispatchJob(Job job) throws ServiceUnavailableException, ServiceRegistryException,
475           UndispatchableJobException {
476     List<ServiceRegistration> registrations = getServiceRegistrationsByLoad(job.getJobType());
477     if (registrations.size() == 0)
478       throw new ServiceUnavailableException("No service is available to handle jobs of type '" + job.getJobType() + "'");
479     job.setStatus(Status.DISPATCHING);
480     try {
481       job = updateJob(job);
482     } catch (NotFoundException e) {
483       throw new ServiceRegistryException("Job not found!", e);
484     }
485     for (ServiceRegistration registration : registrations) {
486       if (registration.isJobProducer() && !registration.isInMaintenanceMode()) {
487         ServiceRegistrationInMemoryImpl inMemoryRegistration = (ServiceRegistrationInMemoryImpl) registration;
488         JobProducer service = inMemoryRegistration.getService();
489 
490         // Add the job to the list of jobs so that it gets counted in the load.
491         // This is the same way that the JPA impl does it
492         Set<Job> jobs = jobHosts.get(inMemoryRegistration);
493         if (jobs == null) {
494           jobs = new LinkedHashSet<Job>();
495         }
496         jobs.add(job);
497         jobHosts.put(inMemoryRegistration, jobs);
498 
499         if (!service.isReadyToAcceptJobs(job.getOperation())) {
500           jobs.remove(job);
501           jobHosts.put(inMemoryRegistration, jobs);
502           continue;
503         }
504         if (!service.isReadyToAccept(job)) {
505           jobs.remove(job);
506           jobHosts.put(inMemoryRegistration, jobs);
507           continue;
508         }
509         try {
510           job = updateJob(job);
511         } catch (NotFoundException e) {
512           jobs.remove(job);
513           jobHosts.put(inMemoryRegistration, jobs);
514           throw new ServiceRegistryException("Job not found!", e);
515         }
516         service.acceptJob(job);
517         return true;
518       } else if (!registration.isJobProducer()) {
519         logger.warn("This implementation of the service registry doesn't support dispatching to remote services");
520         // TODO: Add remote dispatching
521       } else {
522         logger.warn("Service " + registration + " is in maintenance mode");
523       }
524     }
525     return false;
526   }
527 
528   /**
529    * {@inheritDoc}
530    *
531    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#updateJob(org.opencastproject.job.api.Job)
532    */
533   @Override
534   public Job updateJob(Job job) throws NotFoundException, ServiceRegistryException {
535     if (job == null)
536       throw new IllegalArgumentException("Job cannot be null");
537     Job updatedJob = null;
538     synchronized (jobs) {
539       try {
540         updatedJob = updateInternal(job);
541         jobs.put(updatedJob.getId(), JobParser.toXml(new JaxbJob(updatedJob)));
542       } catch (IOException e) {
543         throw new IllegalStateException("Error serializing job", e);
544       }
545     }
546     return updatedJob;
547   }
548 
549   private Job updateInternal(Job job) {
550     Date now = new Date();
551     Status status = job.getStatus();
552     if (job.getDateCreated() == null) {
553       job.setDateCreated(now);
554     }
555     if (Status.RUNNING.equals(status)) {
556       if (job.getDateStarted() == null) {
557         job.setDateStarted(now);
558         job.setQueueTime(now.getTime() - job.getDateCreated().getTime());
559       }
560     } else if (Status.FAILED.equals(status)) {
561       // failed jobs may not have even started properly
562       job.setDateCompleted(now);
563       if (job.getDateStarted() != null) {
564         job.setRunTime(now.getTime() - job.getDateStarted().getTime());
565       }
566     } else if (Status.FINISHED.equals(status)) {
567       if (job.getDateStarted() == null) {
568         // Some services (e.g. ingest) don't use job dispatching, since they start immediately and handle their own
569         // lifecycle. In these cases, if the start date isn't set, use the date created as the start date
570         job.setDateStarted(job.getDateCreated());
571       }
572       job.setDateCompleted(now);
573       job.setRunTime(now.getTime() - job.getDateStarted().getTime());
574 
575       // Cleanup local list of jobs assigned to a specific service
576       for (Entry<String, List<ServiceRegistrationInMemoryImpl>> service : services.entrySet()) {
577         for (ServiceRegistrationInMemoryImpl srv : service.getValue()) {
578           Set<Job> jobs = jobHosts.get(srv);
579           if (jobs != null) {
580             Set<Job> updatedJobs = new HashSet<>();
581             for (Job savedJob : jobs) {
582               if (savedJob.getId() != job.getId())
583                 updatedJobs.add(savedJob);
584             }
585             jobHosts.put(srv, updatedJobs);
586           }
587         }
588       }
589     }
590     return job;
591   }
592 
593   /**
594    * {@inheritDoc}
595    *
596    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getJob(long)
597    */
598   @Override
599   public Job getJob(long id) throws NotFoundException, ServiceRegistryException {
600     synchronized (jobs) {
601       String serializedJob = jobs.get(id);
602       if (serializedJob == null)
603         throw new NotFoundException(Long.toString(id));
604       try {
605         return JobParser.parseJob(serializedJob);
606       } catch (IOException e) {
607         throw new IllegalStateException("Error unmarshaling job", e);
608       }
609     }
610   }
611 
612   /**
613    * {@inheritDoc}
614    *
615    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getChildJobs(long)
616    */
617   @Override
618   public List<Job> getChildJobs(long id) throws ServiceRegistryException {
619     List<Job> result = new ArrayList<Job>();
620     synchronized (jobs) {
621       for (String serializedJob : jobs.values()) {
622         Job job = null;
623         try {
624           job = JobParser.parseJob(serializedJob);
625         } catch (IOException e) {
626           throw new IllegalStateException("Error unmarshaling job", e);
627         }
628         if (job.getParentJobId() == null)
629           continue;
630         if (job.getParentJobId().equals(id) || job.getRootJobId().equals(id))
631           result.add(job);
632 
633         Long parentJobId = job.getParentJobId();
634         while (parentJobId != null && parentJobId > 0) {
635           try {
636             Job parentJob = getJob(job.getParentJobId());
637             if (parentJob.getParentJobId().equals(id)) {
638               result.add(job);
639               break;
640             }
641             parentJobId = parentJob.getParentJobId();
642           } catch (NotFoundException e) {
643             throw new ServiceRegistryException("Job from parent job id was not found!", e);
644           }
645         }
646       }
647     }
648     Collections.sort(result, new Comparator<Job>() {
649       @Override
650       public int compare(Job job1, Job job2) {
651         return job1.getDateCreated().compareTo(job1.getDateCreated());
652       }
653     });
654     return result;
655   }
656 
657   /**
658    * {@inheritDoc}
659    *
660    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getJobs(java.lang.String,
661    *      org.opencastproject.job.api.Job.Status)
662    */
663   @Override
664   public List<Job> getJobs(String serviceType, Status status) throws ServiceRegistryException {
665     List<Job> result = new ArrayList<Job>();
666     synchronized (jobs) {
667       for (String serializedJob : jobs.values()) {
668         Job job = null;
669         try {
670           job = JobParser.parseJob(serializedJob);
671         } catch (IOException e) {
672           throw new IllegalStateException("Error unmarshaling job", e);
673         }
674         if (serviceType.equals(job.getJobType()) && status.equals(job.getStatus()))
675           result.add(job);
676       }
677     }
678     return result;
679   }
680 
681   @Override
682   public List<String> getJobPayloads(String operation) throws ServiceRegistryException {
683     List<String> result = new ArrayList<>();
684     for (String serializedJob : jobs.values()) {
685       try {
686         Job job = JobParser.parseJob(serializedJob);
687         if (operation.equals(job.getOperation())) {
688           result.add(job.getPayload());
689         }
690       } catch (IOException e) {
691         throw new IllegalStateException("Error unmarshaling job", e);
692       }
693     }
694     return result;
695   }
696 
697   @Override
698   public List<String> getJobPayloads(String operation, int limit, int offset) throws ServiceRegistryException {
699     return null;
700   }
701 
702   @Override
703   public int getJobCount(String operation) throws ServiceRegistryException {
704     return 0;
705   }
706 
707   /**
708    * {@inheritDoc}
709    *
710    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getActiveJobs()
711    */
712   @Override
713   public List<Job> getActiveJobs() throws ServiceRegistryException {
714     List<Job> result = new ArrayList<Job>();
715     synchronized (jobs) {
716       for (String serializedJob : jobs.values()) {
717         Job job = null;
718         try {
719           job = JobParser.parseJob(serializedJob);
720         } catch (IOException e) {
721           throw new IllegalStateException("Error unmarshaling job", e);
722         }
723         if (job.getStatus().isActive())
724           result.add(job);
725       }
726     }
727     return result;
728   }
729 
730   @Override
731   public Incidents incident() {
732     return incidents;
733   }
734 
735   /**
736    * {@inheritDoc}
737    *
738    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrationsByLoad(java.lang.String)
739    */
740   @Override
741   public List<ServiceRegistration> getServiceRegistrationsByLoad(String serviceType) throws ServiceRegistryException {
742     return getServiceRegistrationsByType(serviceType);
743   }
744 
745   /**
746    * {@inheritDoc}
747    *
748    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrationsByType(java.lang.String)
749    */
750   @Override
751   public List<ServiceRegistration> getServiceRegistrationsByType(String serviceType) throws ServiceRegistryException {
752     List<ServiceRegistration> result = new ArrayList<ServiceRegistration>();
753     for (List<ServiceRegistrationInMemoryImpl> servicesPerHost : services.values()) {
754       for (ServiceRegistrationInMemoryImpl r : servicesPerHost) {
755         if (serviceType.equals(r.getServiceType()))
756           result.add(r);
757       }
758     }
759     return result;
760   }
761 
762   /**
763    * {@inheritDoc}
764    *
765    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrationsByHost(java.lang.String)
766    */
767   @Override
768   public List<ServiceRegistration> getServiceRegistrationsByHost(String host) throws ServiceRegistryException {
769     List<ServiceRegistration> result = new ArrayList<ServiceRegistration>();
770     List<ServiceRegistrationInMemoryImpl> servicesPerHost = services.get(host);
771     if (servicesPerHost != null) {
772       result.addAll(servicesPerHost);
773     }
774     return result;
775   }
776 
777   /**
778    * {@inheritDoc}
779    *
780    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistration(java.lang.String,
781    *      java.lang.String)
782    */
783   @Override
784   public ServiceRegistration getServiceRegistration(String serviceType, String host) throws ServiceRegistryException {
785     List<ServiceRegistrationInMemoryImpl> servicesPerHost = services.get(host);
786     if (servicesPerHost != null) {
787       for (ServiceRegistrationInMemoryImpl r : servicesPerHost) {
788         if (serviceType.equals(r.getServiceType()))
789           return r;
790       }
791     }
792     return null;
793   }
794 
795   /**
796    * {@inheritDoc}
797    *
798    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrations()
799    */
800   @Override
801   public List<ServiceRegistration> getServiceRegistrations() throws ServiceRegistryException {
802     List<ServiceRegistration> result = new ArrayList<ServiceRegistration>();
803     for (List<ServiceRegistrationInMemoryImpl> servicesPerHost : services.values()) {
804       result.addAll(servicesPerHost);
805     }
806     return result;
807   }
808 
809   /**
810    * {@inheritDoc}
811    *
812    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceStatistics()
813    */
814   @Override
815   public List<ServiceStatistics> getServiceStatistics() throws ServiceRegistryException {
816     throw new UnsupportedOperationException("Operation not yet implemented");
817   }
818 
819   /**
820    * {@inheritDoc}
821    *
822    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#count(java.lang.String,
823    *      org.opencastproject.job.api.Job.Status)
824    */
825   @Override
826   public long count(String serviceType, Status status) throws ServiceRegistryException {
827     return count(serviceType, null, null, status);
828   }
829 
830   /**
831    * {@inheritDoc}
832    *
833    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#countByOperation(java.lang.String, java.lang.String,
834    *      org.opencastproject.job.api.Job.Status)
835    */
836   @Override
837   public long countByOperation(String serviceType, String operation, Status status) throws ServiceRegistryException {
838     return count(serviceType, null, operation, status);
839   }
840 
841   /**
842    * {@inheritDoc}
843    *
844    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#countByHost(java.lang.String, java.lang.String,
845    *      org.opencastproject.job.api.Job.Status)
846    */
847   @Override
848   public long countByHost(String serviceType, String host, Status status) throws ServiceRegistryException {
849     return count(serviceType, host, null, status);
850   }
851 
852   /**
853    * {@inheritDoc}
854    *
855    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#count(java.lang.String, java.lang.String,
856    *      java.lang.String, org.opencastproject.job.api.Job.Status)
857    */
858   @Override
859   public long count(String serviceType, String host, String operation, Status status) throws ServiceRegistryException {
860     int count = 0;
861     synchronized (jobs) {
862       for (String serializedJob : jobs.values()) {
863         Job job = null;
864         try {
865           job = JobParser.parseJob(serializedJob);
866         } catch (IOException e) {
867           throw new IllegalStateException("Error unmarshaling job", e);
868         }
869         if (serviceType != null && !serviceType.equals(job.getJobType()))
870           continue;
871         if (host != null && !host.equals(job.getProcessingHost()))
872           continue;
873         if (operation != null && !operation.equals(job.getOperation()))
874           continue;
875         if (status != null && !status.equals(job.getStatus()))
876           continue;
877         count++;
878       }
879     }
880     return count;
881   }
882 
883   /**
884    * This dispatcher implementation will wake from time to time and check for new jobs. If new jobs are found, it will
885    * dispatch them to the services as appropriate.
886    */
887   class JobDispatcher implements Runnable {
888 
889     /**
890      * {@inheritDoc}
891      *
892      * @see java.lang.Thread#run()
893      */
894     @Override
895     public void run() {
896 
897       // Go through the jobs and find those that have not yet been dispatched
898       synchronized (jobs) {
899         for (String serializedJob : jobs.values()) {
900           Job job = null;
901           try {
902             job = JobParser.parseJob(serializedJob);
903             User creator = userDirectoryService.loadUser(job.getCreator());
904             Organization organization = organizationDirectoryService.getOrganization(job.getOrganization());
905             securityService.setUser(creator);
906             securityService.setOrganization(organization);
907             if (Status.QUEUED.equals(job.getStatus())) {
908               job.setStatus(Status.DISPATCHING);
909               if (!dispatchJob(job)) {
910                 job.setStatus(Status.QUEUED);
911               }
912             }
913           } catch (ServiceUnavailableException e) {
914             job.setStatus(Status.FAILED);
915             Throwable cause = (e.getCause() != null) ? e.getCause() : e;
916             logger.error("Unable to find a service for job " + job, cause);
917           } catch (ServiceRegistryException e) {
918             job.setStatus(Status.FAILED);
919             Throwable cause = (e.getCause() != null) ? e.getCause() : e;
920             logger.error("Error dispatching job " + job, cause);
921           } catch (IOException e) {
922             throw new IllegalStateException("Error unmarshaling job", e);
923           } catch (NotFoundException e) {
924             throw new IllegalStateException("Creator organization not found", e);
925           } catch (Throwable e) {
926             logger.error("Error dispatching job " + job, e);
927           } finally {
928             try {
929               jobs.put(job.getId(), JobParser.toXml(new JaxbJob(job)));
930             } catch (IOException e) {
931               throw new IllegalStateException("Error unmarshaling job", e);
932             }
933             securityService.setUser(null);
934             securityService.setOrganization(null);
935           }
936         }
937       }
938     }
939   }
940 
941   /** Shuts down this service registry, logging all jobs and their statuses. */
942   public void deactivate() {
943     dispatcher.shutdownNow();
944     Map<Status, AtomicInteger> counts = new HashMap<Job.Status, AtomicInteger>();
945     synchronized (jobs) {
946       for (String serializedJob : jobs.values()) {
947         Job job = null;
948         try {
949           job = JobParser.parseJob(serializedJob);
950         } catch (IOException e) {
951           throw new IllegalStateException("Error unmarshaling job", e);
952         }
953         if (counts.containsKey(job.getStatus())) {
954           counts.get(job.getStatus()).incrementAndGet();
955         } else {
956           counts.put(job.getStatus(), new AtomicInteger(1));
957         }
958       }
959     }
960     StringBuilder sb = new StringBuilder("Abandoned:");
961     for (Entry<Status, AtomicInteger> entry : counts.entrySet()) {
962       sb.append(" " + entry.getValue() + " " + entry.getKey() + " jobs");
963     }
964     logger.info(sb.toString());
965   }
966 
967   /**
968    * {@inheritDoc}
969    *
970    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getMaxLoads()
971    */
972   @Override
973   public SystemLoad getMaxLoads() throws ServiceRegistryException {
974     SystemLoad systemLoad = new SystemLoad();
975     systemLoad.addNodeLoad(new NodeLoad(LOCALHOST, 0.0f, Runtime.getRuntime().availableProcessors()));
976     return systemLoad;
977   }
978 
979   /**
980    * {@inheritDoc}
981    *
982    * @see org.opencastproject.serviceregistry.api.ServiceRegistry#getMaxLoadOnNode(java.lang.String)
983    */
984   @Override
985   public NodeLoad getMaxLoadOnNode(String host) throws ServiceRegistryException {
986     if (hosts.containsKey(host)) {
987       return new NodeLoad(host, 0.0f, hosts.get(host).getMaxLoad());
988     }
989     throw new ServiceRegistryException("Unable to find host " + host + " in service registry");
990   }
991 
992   /**
993    * Sets the security service.
994    *
995    * @param securityService
996    *          the securityService to set
997    */
998   public void setSecurityService(SecurityService securityService) {
999     this.securityService = securityService;
1000   }
1001 
1002   @Override
1003   public void sanitize(String serviceType, String host) {
1004     // TODO Auto-generated method stub
1005   }
1006 
1007   @Override
1008   public Job getCurrentJob() {
1009     return this.currentJob;
1010   }
1011 
1012   @Override
1013   public void setCurrentJob(Job job) {
1014     this.currentJob = job;
1015   }
1016 
1017   @Override
1018   public List<HostRegistration> getHostRegistrations() throws ServiceRegistryException {
1019     List<HostRegistration> hostList = new LinkedList<HostRegistration>();
1020     hostList.addAll(hosts.values());
1021     return hostList;
1022   }
1023 
1024   @Override
1025   public HostStatistics getHostStatistics() {
1026     HostStatistics statistics = new HostStatistics();
1027     for (Map.Entry<ServiceRegistrationInMemoryImpl, Set<Job>> entry: jobHosts.entrySet()) {
1028       final ServiceRegistrationInMemoryImpl service = entry.getKey();
1029       final long queued = entry.getValue().stream().filter(job -> job.getStatus() == Status.QUEUED).count();
1030       final long running = entry.getValue().stream().filter(job -> job.getStatus() == Status.RUNNING).count();
1031       final long host = service.host.hashCode();
1032       statistics.addQueued(host, statistics.queuedJobs(host) + queued);
1033       statistics.addRunning(host, statistics.runningJobs(host) + running);
1034     }
1035     return statistics;
1036   }
1037 
1038   @Override
1039   public HostRegistration getHostRegistration(String hostname) throws ServiceRegistryException {
1040     for (HostRegistration host:  this.getHostRegistrations()) {
1041       if (host.getBaseUrl().equalsIgnoreCase(hostname)) {
1042         return host;
1043       }
1044     }
1045     throw new ServiceRegistryException(String.format("Host registration for %s not found", hostname));
1046   }
1047 
1048   @Override
1049   public SystemLoad getCurrentHostLoads() {
1050     SystemLoad systemLoad = new SystemLoad();
1051 
1052     for (String host : hosts.keySet()) {
1053       NodeLoad node = new NodeLoad();
1054       node.setHost(host);
1055       for (ServiceRegistration service : services.get(host)) {
1056         if (service.isInMaintenanceMode() || !service.isOnline()) {
1057           continue;
1058         }
1059         Set<Job> hostJobs = jobHosts.get(service);
1060         float loadSum = 0.0f;
1061         if (hostJobs != null) {
1062           for (Job job : hostJobs) {
1063             if (job.getStatus() != null && JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(job.getStatus())) {
1064               loadSum += job.getJobLoad();
1065             }
1066           }
1067         }
1068         node.setCurrentLoad(loadSum);
1069       }
1070       systemLoad.addNodeLoad(node);
1071     }
1072     return systemLoad;
1073   }
1074 
1075   @Override
1076   public void removeParentlessJobs(int lifetime) throws ServiceRegistryException {
1077     synchronized (jobs) {
1078       for (String serializedJob : jobs.values()) {
1079         Job job = null;
1080         try {
1081           job = JobParser.parseJob(serializedJob);
1082         } catch (IOException e) {
1083           throw new IllegalStateException("Error unmarshaling job", e);
1084         }
1085 
1086         Long parentJobId = job.getParentJobId();
1087         if (parentJobId == null || parentJobId < 1)
1088           jobs.remove(job.getId());
1089       }
1090     }
1091   }
1092 
1093   @Override
1094   public Map<String, Map<String, Long>> countActiveByOrganizationAndHost() {
1095     var hostMap = new HashMap<String, Long>();
1096     for (var entry: jobHosts.entrySet()) {
1097       var host = entry.getKey().host;
1098       var count = entry.getValue().size();
1099       hostMap.put(host, (long) count);
1100     }
1101     return Map.of("mh_dafault_org", hostMap);
1102   }
1103 
1104   @Override
1105   public Map<String, Long> countActiveTypeByOrganization(String operation) {
1106     throw new NotImplementedException("This has not been implemented");
1107   }
1108 
1109   @Override
1110   public float getOwnLoad() {
1111     return getCurrentHostLoads().get(getRegistryHostname()).getCurrentLoad();
1112   }
1113 
1114   @Override
1115   public String getRegistryHostname() {
1116     return LOCALHOST;
1117   }
1118 
1119 }