View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.terminationstate.api;
22  
23  import org.opencastproject.job.api.Job;
24  import org.opencastproject.serviceregistry.api.ServiceRegistry;
25  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
26  
27  import org.slf4j.Logger;
28  import org.slf4j.LoggerFactory;
29  
30  /** AbstractJobTerminationStateService.
31   * Abstract implementation of of TerminationStateService API which checks
32   * whether there are any jobs running before changing the termination state from
33   * WAIT to READY once a termination notification has been received.
34   */
35  public abstract class AbstractJobTerminationStateService implements TerminationStateService {
36  
37    private TerminationState state = TerminationState.NONE;
38    private ServiceRegistry serviceRegistry;
39  
40    private final Logger logger = LoggerFactory.getLogger(AbstractJobTerminationStateService.class.getName());
41  
42    /**
43     * {@inheritDoc}
44     */
45    @Override
46    public void setState(TerminationState state) {
47      this.state = state;
48      logger.info("Termination state set to {}", state.toString());
49    }
50  
51    /**
52     * {@inheritDoc}
53     */
54    @Override
55    public TerminationState getState() {
56      return state;
57    }
58  
59    /**
60     * Count the current number of jobs this node is processing
61     * @return number jobs
62     */
63    protected long countJobs() throws ServiceRegistryException {
64      String host = "";
65      long nJobs = 0;
66  
67      try {
68        host = serviceRegistry.getRegistryHostname();
69        nJobs = serviceRegistry.countByHost(null, host, Job.Status.RUNNING);
70      } catch (ServiceRegistryException ex) {
71        logger.error("Cannot count jobs running on {}", host, ex);
72        throw ex;
73      }
74  
75      return nJobs;
76    }
77  
78    /**
79     * If waiting and no jobs running change the state to ready
80     * @return ready to terminate
81     */
82    protected boolean readyToTerminate() {
83      if (state == TerminationState.WAIT) {
84        try {
85          if (countJobs() == 0) {
86            state = TerminationState.READY;
87            return true;
88          }
89        } catch (ServiceRegistryException ex) {
90          // Ready to terminate else node state could become permanently stuck as WAITING
91          logger.warn("Can't determine number of running Jobs, setting Termination State to READ");
92          state = TerminationState.READY;
93          return true;
94        }
95      } else if (state == TerminationState.READY) {
96        return true;
97      }
98  
99      return false;
100   }
101 
102   /**
103    * OSGI dependency injection of service registry
104    * @param service ServiceRegistry instance
105    */
106   public void setServiceRegistry(ServiceRegistry service) {
107     this.serviceRegistry = service;
108   }
109 
110   protected ServiceRegistry getServiceRegistry() {
111     return serviceRegistry;
112   }
113 }