View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.job.api;
23  
24  import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
25  
26  import org.opencastproject.job.api.Incident.Severity;
27  import org.opencastproject.job.api.Job.Status;
28  import org.opencastproject.security.api.Organization;
29  import org.opencastproject.security.api.OrganizationDirectoryService;
30  import org.opencastproject.security.api.SecurityService;
31  import org.opencastproject.security.api.User;
32  import org.opencastproject.security.api.UserDirectoryService;
33  import org.opencastproject.serviceregistry.api.Incidents;
34  import org.opencastproject.serviceregistry.api.ServiceRegistry;
35  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
36  import org.opencastproject.serviceregistry.api.SystemLoad.NodeLoad;
37  import org.opencastproject.serviceregistry.api.UndispatchableJobException;
38  import org.opencastproject.util.JobCanceledException;
39  import org.opencastproject.util.NotFoundException;
40  
41  import org.osgi.service.component.ComponentContext;
42  import org.slf4j.Logger;
43  import org.slf4j.LoggerFactory;
44  
45  import java.text.DecimalFormat;
46  import java.util.Optional;
47  import java.util.concurrent.Callable;
48  import java.util.concurrent.ExecutorService;
49  import java.util.concurrent.Executors;
50  
51  /**
52   * This class serves as a convenience for services that implement the {@link JobProducer} api to deal with handling long
53   * running, asynchronous operations.
54   */
55  public abstract class AbstractJobProducer implements JobProducer {
56  
57    /** The logger */
58    static final Logger logger = LoggerFactory.getLogger(AbstractJobProducer.class);
59  
60    /** The default value whether to accept a job whose load exceeds the host’s max load */
61    public static final boolean DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING = true;
62  
63    /**
64     * The key to look for in the service configuration file to override the
65     * {@link AbstractJobProducer#DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING}
66     */
67    public static final String ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY = "org.opencastproject.job.load.acceptexceeding";
68  
69    /** The formatter for load values */
70    private static final DecimalFormat df = new DecimalFormat("#.#");
71  
72    /** Whether to accept a job whose load exceeds the host’s max load */
73    protected boolean acceptJobLoadsExeedingMaxLoad = DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING;
74  
75    /** The types of job that this producer can handle */
76    protected String jobType = null;
77  
78    /** To enable threading when dispatching jobs */
79    protected ExecutorService executor = Executors.newCachedThreadPool();
80  
81    /**
82     * OSGI activate method.
83     *
84     * @param cc
85     *          OSGI component context
86     **/
87    public void activate(ComponentContext cc) {
88      acceptJobLoadsExeedingMaxLoad = getOptContextProperty(cc, ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY)
89              .map(Boolean::valueOf)
90              .orElse(DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING);
91      logger.debug("Job producer {} accepting excessively large jobs: {}", getJobType(), acceptJobLoadsExeedingMaxLoad);
92    }
93  
94    /**
95     * Creates a new abstract job producer for jobs of the given type.
96     *
97     * @param jobType
98     *          the job type
99     */
100   public AbstractJobProducer(String jobType) {
101     this.jobType = jobType;
102   }
103 
104   /**
105    * {@inheritDoc}
106    *
107    * @see org.opencastproject.job.api.JobProducer#getJobType()
108    */
109   @Override
110   public String getJobType() {
111     return jobType;
112   }
113 
114   /**
115    * {@inheritDoc}
116    *
117    * @see org.opencastproject.job.api.JobProducer#countJobs(org.opencastproject.job.api.Job.Status)
118    */
119   @Override
120   public long countJobs(Status status) throws ServiceRegistryException {
121     if (status == null)
122       throw new IllegalArgumentException("Status must not be null");
123     return getServiceRegistry().count(getJobType(), status);
124   }
125 
126   /**
127    * {@inheritDoc}
128    *
129    * @see org.opencastproject.job.api.JobProducer#acceptJob(org.opencastproject.job.api.Job)
130    */
131   @Override
132   public void acceptJob(final Job job) throws ServiceRegistryException {
133     final Job runningJob;
134     try {
135       job.setStatus(Job.Status.RUNNING);
136       runningJob = getServiceRegistry().updateJob(job);
137     } catch (NotFoundException e) {
138       throw new IllegalStateException(e);
139     }
140     executor.submit(new JobRunner(runningJob, getServiceRegistry().getCurrentJob()));
141   }
142 
143   /**
144    * {@inheritDoc}
145    *
146    * @see org.opencastproject.job.api.JobProducer#isReadyToAcceptJobs(String)
147    */
148   @Override
149   public boolean isReadyToAcceptJobs(String operation) throws ServiceRegistryException {
150     return true;
151   }
152 
153   /**
154    * {@inheritDoc}
155    *
156    * @see org.opencastproject.job.api.JobProducer#isReadyToAccept(org.opencastproject.job.api.Job)
157    */
158   @Override
159   public boolean isReadyToAccept(Job job) throws ServiceRegistryException, UndispatchableJobException {
160     if (!jobType.equals(job.getJobType())) {
161       logger.debug("Invalid job type submitted: {}", job.getJobType());
162       return false;
163     }
164     NodeLoad maxload;
165     try {
166       maxload = getServiceRegistry().getMaxLoadOnNode(getServiceRegistry().getRegistryHostname());
167     } catch (NotFoundException e) {
168       throw new ServiceRegistryException(e);
169     }
170 
171     // Note: We are not adding the job load in the next line because it is already accounted for in the load values we
172     // get back from the service registry.
173     float currentLoad = getServiceRegistry().getOwnLoad();
174     logger.debug("{} Current load on this host: {}, job's load: {}, job's status: {}, max load: {}",
175             Thread.currentThread().getId(), currentLoad, job.getJobLoad(), job.getStatus().name(),
176             maxload.getMaxLoad());
177     // Add the current job load to compare below
178     currentLoad += job.getJobLoad();
179 
180     /* Note that this first clause looks at the *job's*, the other two look at the *node's* load
181      * We're assuming that if this case is true, then we're also the most powerful node in the system for this service,
182      * per the current job dispatching code in ServiceRegistryJpaImpl */
183     if (job.getJobLoad() > maxload.getMaxLoad() && acceptJobLoadsExeedingMaxLoad) {
184       logger.warn(
185               "{} Accepting job {} of type {} with load {} even though load of {} is above this node's limit of {}.",
186               Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
187               df.format(currentLoad), df.format(maxload.getMaxLoad()));
188       logger.warn("This is a configuration issue that you should resolve in a production system!");
189       return true;
190     } else if (currentLoad > maxload.getMaxLoad()) {
191       logger.debug(
192               "{} Declining job {} of type {} with load {} because load of {} would exceed this node's limit of {}.",
193               Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
194               df.format(currentLoad), df.format(maxload.getMaxLoad()));
195       return false;
196     } else  {
197       logger.debug("{} Accepting job {} of type {} with load {} because load of {} is within this node's limit of {}.",
198               Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
199               df.format(currentLoad), df.format(maxload.getMaxLoad()));
200       return true;
201     }
202   }
203 
204 
205   /**
206    * Private utility to update and optionally fail job, called from a finally block.
207    *
208    * @param job
209    *          to be updated, may be null
210    */
211   protected void finallyUpdateJob(Job job)  {
212     if (job == null) {
213       return;
214     }
215 
216     if (!Job.Status.FINISHED.equals(job.getStatus())) {
217       job.setStatus(Job.Status.FAILED);
218     }
219     try {
220       getServiceRegistry().updateJob(job);
221     } catch (Exception e) {
222       throw new RuntimeException(e);
223     }
224   }
225 
226   /** Shorthand for {@link #getServiceRegistry()}.incident() */
227   public Incidents incident() {
228     return getServiceRegistry().incident();
229   }
230 
231   /**
232    * Returns a reference to the service registry.
233    *
234    * @return the service registry
235    */
236   protected abstract ServiceRegistry getServiceRegistry();
237 
238   /**
239    * Returns a reference to the security service
240    *
241    * @return the security service
242    */
243   protected abstract SecurityService getSecurityService();
244 
245   /**
246    * Returns a reference to the user directory service
247    *
248    * @return the user directory service
249    */
250   protected abstract UserDirectoryService getUserDirectoryService();
251 
252   /**
253    * Returns a reference to the organization directory service.
254    *
255    * @return the organization directory service
256    */
257   protected abstract OrganizationDirectoryService getOrganizationDirectoryService();
258 
259   /**
260    * Asks the overriding class to process the arguments using the given operation. The result will be added to the
261    * associated job as the payload.
262    *
263    * @param job
264    *          the job to process
265    * @return the operation result
266    * @throws Exception
267    */
268   protected abstract String process(Job job) throws Exception;
269 
270   /** A utility class to run jobs */
271   class JobRunner implements Callable<Void> {
272 
273     /** The job to dispatch */
274     private final long jobId;
275 
276     /** The current job */
277     private final Optional<Long> currentJobId;
278 
279     /**
280      * Constructs a new job runner
281      *
282      * @param job
283      *          the job to run
284      * @param currentJob
285      *          the current running job
286      */
287     JobRunner(Job job, Job currentJob) {
288       jobId = job.getId();
289       if (currentJob != null) {
290         currentJobId = Optional.of(currentJob.getId());
291       } else {
292         currentJobId = Optional.empty();
293       }
294     }
295 
296     @Override
297     public Void call() throws Exception {
298       final SecurityService securityService = getSecurityService();
299       final ServiceRegistry serviceRegistry = getServiceRegistry();
300       final Job jobBeforeProcessing = serviceRegistry.getJob(jobId);
301 
302       if (currentJobId.isPresent())
303         serviceRegistry.setCurrentJob(serviceRegistry.getJob(currentJobId.get()));
304 
305       final Organization organization = getOrganizationDirectoryService()
306               .getOrganization(jobBeforeProcessing.getOrganization());
307       securityService.setOrganization(organization);
308       final User user = getUserDirectoryService().loadUser(jobBeforeProcessing.getCreator());
309       securityService.setUser(user);
310 
311       try {
312         final String payload = process(jobBeforeProcessing);
313         handleSuccessfulProcessing(payload);
314       } catch (Throwable t) {
315         handleFailedProcessing(t);
316       } finally {
317         serviceRegistry.setCurrentJob(null);
318         securityService.setUser(null);
319         securityService.setOrganization(null);
320       }
321 
322       return null;
323     }
324 
325     private void handleSuccessfulProcessing(final String payload) throws Exception {
326       // The job may gets updated internally during processing. It therefore needs to be reload from the service
327       // registry in order to prevent inconsistencies.
328       final Job jobAfterProcessing = getServiceRegistry().getJob(jobId);
329       jobAfterProcessing.setPayload(payload);
330       jobAfterProcessing.setStatus(Status.FINISHED);
331       getServiceRegistry().updateJob(jobAfterProcessing);
332     }
333 
334     private void handleFailedProcessing(final Throwable t) throws Exception {
335       if (t instanceof JobCanceledException) {
336         logger.info(t.getMessage());
337       } else {
338         Job jobAfterProcessing = getServiceRegistry().getJob(jobId);
339         jobAfterProcessing.setStatus(Status.FAILED);
340         jobAfterProcessing = getServiceRegistry().updateJob(jobAfterProcessing);
341         getServiceRegistry().incident().unhandledException(jobAfterProcessing, Severity.FAILURE, t);
342         logger.error("Error handling operation '{}':", jobAfterProcessing.getOperation(), t);
343         if (t instanceof ServiceRegistryException)
344           throw (ServiceRegistryException) t;
345       }
346     }
347 
348   }
349 }