View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.job.api;
23  
24  import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
25  
26  import org.opencastproject.job.api.Incident.Severity;
27  import org.opencastproject.job.api.Job.Status;
28  import org.opencastproject.security.api.Organization;
29  import org.opencastproject.security.api.OrganizationDirectoryService;
30  import org.opencastproject.security.api.SecurityService;
31  import org.opencastproject.security.api.User;
32  import org.opencastproject.security.api.UserDirectoryService;
33  import org.opencastproject.serviceregistry.api.Incidents;
34  import org.opencastproject.serviceregistry.api.ServiceRegistry;
35  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
36  import org.opencastproject.serviceregistry.api.SystemLoad.NodeLoad;
37  import org.opencastproject.serviceregistry.api.UndispatchableJobException;
38  import org.opencastproject.util.JobCanceledException;
39  import org.opencastproject.util.NotFoundException;
40  
41  import org.osgi.service.component.ComponentContext;
42  import org.slf4j.Logger;
43  import org.slf4j.LoggerFactory;
44  
45  import java.text.DecimalFormat;
46  import java.util.Optional;
47  import java.util.concurrent.Callable;
48  import java.util.concurrent.ExecutorService;
49  import java.util.concurrent.Executors;
50  
51  /**
52   * This class serves as a convenience for services that implement the {@link JobProducer} api to deal with handling long
53   * running, asynchronous operations.
54   */
55  public abstract class AbstractJobProducer implements JobProducer {
56  
57    /** The logger */
58    static final Logger logger = LoggerFactory.getLogger(AbstractJobProducer.class);
59  
60    /** The default value whether to accept a job whose load exceeds the host’s max load */
61    public static final boolean DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING = true;
62  
63    /**
64     * The key to look for in the service configuration file to override the
65     * {@link AbstractJobProducer#DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING}
66     */
67    public static final String ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY = "org.opencastproject.job.load.acceptexceeding";
68  
69    /** The formatter for load values */
70    private static final DecimalFormat df = new DecimalFormat("#.#");
71  
72    /** Whether to accept a job whose load exceeds the host’s max load */
73    protected boolean acceptJobLoadsExeedingMaxLoad = DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING;
74  
75    /** The types of job that this producer can handle */
76    protected String jobType = null;
77  
78    /** To enable threading when dispatching jobs */
79    protected ExecutorService executor = Executors.newCachedThreadPool();
80  
81    /**
82     * OSGI activate method.
83     *
84     * @param cc
85     *          OSGI component context
86     **/
87    public void activate(ComponentContext cc) {
88      acceptJobLoadsExeedingMaxLoad = getOptContextProperty(cc, ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY)
89              .map(Boolean::valueOf)
90              .orElse(DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING);
91      logger.debug("Job producer {} accepting excessively large jobs: {}", getJobType(), acceptJobLoadsExeedingMaxLoad);
92    }
93  
94    /**
95     * Creates a new abstract job producer for jobs of the given type.
96     *
97     * @param jobType
98     *          the job type
99     */
100   public AbstractJobProducer(String jobType) {
101     this.jobType = jobType;
102   }
103 
104   /**
105    * {@inheritDoc}
106    *
107    * @see org.opencastproject.job.api.JobProducer#getJobType()
108    */
109   @Override
110   public String getJobType() {
111     return jobType;
112   }
113 
114   /**
115    * {@inheritDoc}
116    *
117    * @see org.opencastproject.job.api.JobProducer#countJobs(org.opencastproject.job.api.Job.Status)
118    */
119   @Override
120   public long countJobs(Status status) throws ServiceRegistryException {
121     if (status == null) {
122       throw new IllegalArgumentException("Status must not be null");
123     }
124     return getServiceRegistry().count(getJobType(), status);
125   }
126 
127   /**
128    * {@inheritDoc}
129    *
130    * @see org.opencastproject.job.api.JobProducer#acceptJob(org.opencastproject.job.api.Job)
131    */
132   @Override
133   public void acceptJob(final Job job) throws ServiceRegistryException {
134     final Job runningJob;
135     try {
136       job.setStatus(Job.Status.RUNNING);
137       runningJob = getServiceRegistry().updateJob(job);
138     } catch (NotFoundException e) {
139       throw new IllegalStateException(e);
140     }
141     executor.submit(new JobRunner(runningJob, getServiceRegistry().getCurrentJob()));
142   }
143 
144   /**
145    * {@inheritDoc}
146    *
147    * @see org.opencastproject.job.api.JobProducer#isReadyToAcceptJobs(String)
148    */
149   @Override
150   public boolean isReadyToAcceptJobs(String operation) throws ServiceRegistryException {
151     return true;
152   }
153 
154   /**
155    * {@inheritDoc}
156    *
157    * @see org.opencastproject.job.api.JobProducer#isReadyToAccept(org.opencastproject.job.api.Job)
158    */
159   @Override
160   public boolean isReadyToAccept(Job job) throws ServiceRegistryException, UndispatchableJobException {
161     if (!jobType.equals(job.getJobType())) {
162       logger.debug("Invalid job type submitted: {}", job.getJobType());
163       return false;
164     }
165     NodeLoad maxload;
166     try {
167       maxload = getServiceRegistry().getMaxLoadOnNode(getServiceRegistry().getRegistryHostname());
168     } catch (NotFoundException e) {
169       throw new ServiceRegistryException(e);
170     }
171 
172     // Note: We are not adding the job load in the next line because it is already accounted for in the load values we
173     // get back from the service registry.
174     float currentLoad = getServiceRegistry().getOwnLoad();
175     logger.debug("{} Current load on this host: {}, job's load: {}, job's status: {}, max load: {}",
176             Thread.currentThread().getId(), currentLoad, job.getJobLoad(), job.getStatus().name(),
177             maxload.getMaxLoad());
178     // Add the current job load to compare below
179     currentLoad += job.getJobLoad();
180 
181     /* Note that this first clause looks at the *job's*, the other two look at the *node's* load
182      * We're assuming that if this case is true, then we're also the most powerful node in the system for this service,
183      * per the current job dispatching code in ServiceRegistryJpaImpl */
184     if (job.getJobLoad() > maxload.getMaxLoad() && acceptJobLoadsExeedingMaxLoad) {
185       logger.warn(
186               "{} Accepting job {} of type {} with load {} even though load of {} is above this node's limit of {}.",
187               Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
188               df.format(currentLoad), df.format(maxload.getMaxLoad()));
189       logger.warn("This is a configuration issue that you should resolve in a production system!");
190       return true;
191     } else if (currentLoad > maxload.getMaxLoad()) {
192       logger.debug(
193               "{} Declining job {} of type {} with load {} because load of {} would exceed this node's limit of {}.",
194               Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
195               df.format(currentLoad), df.format(maxload.getMaxLoad()));
196       return false;
197     } else  {
198       logger.debug("{} Accepting job {} of type {} with load {} because load of {} is within this node's limit of {}.",
199               Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
200               df.format(currentLoad), df.format(maxload.getMaxLoad()));
201       return true;
202     }
203   }
204 
205 
206   /**
207    * Private utility to update and optionally fail job, called from a finally block.
208    *
209    * @param job
210    *          to be updated, may be null
211    */
212   protected void finallyUpdateJob(Job job)  {
213     if (job == null) {
214       return;
215     }
216 
217     if (!Job.Status.FINISHED.equals(job.getStatus())) {
218       job.setStatus(Job.Status.FAILED);
219     }
220     try {
221       getServiceRegistry().updateJob(job);
222     } catch (Exception e) {
223       throw new RuntimeException(e);
224     }
225   }
226 
227   /** Shorthand for {@link #getServiceRegistry()}.incident() */
228   public Incidents incident() {
229     return getServiceRegistry().incident();
230   }
231 
232   /**
233    * Returns a reference to the service registry.
234    *
235    * @return the service registry
236    */
237   protected abstract ServiceRegistry getServiceRegistry();
238 
239   /**
240    * Returns a reference to the security service
241    *
242    * @return the security service
243    */
244   protected abstract SecurityService getSecurityService();
245 
246   /**
247    * Returns a reference to the user directory service
248    *
249    * @return the user directory service
250    */
251   protected abstract UserDirectoryService getUserDirectoryService();
252 
253   /**
254    * Returns a reference to the organization directory service.
255    *
256    * @return the organization directory service
257    */
258   protected abstract OrganizationDirectoryService getOrganizationDirectoryService();
259 
260   /**
261    * Asks the overriding class to process the arguments using the given operation. The result will be added to the
262    * associated job as the payload.
263    *
264    * @param job
265    *          the job to process
266    * @return the operation result
267    * @throws Exception
268    */
269   protected abstract String process(Job job) throws Exception;
270 
271   /** A utility class to run jobs */
272   class JobRunner implements Callable<Void> {
273 
274     /** The job to dispatch */
275     private final long jobId;
276 
277     /** The current job */
278     private final Optional<Long> currentJobId;
279 
280     /**
281      * Constructs a new job runner
282      *
283      * @param job
284      *          the job to run
285      * @param currentJob
286      *          the current running job
287      */
288     JobRunner(Job job, Job currentJob) {
289       jobId = job.getId();
290       if (currentJob != null) {
291         currentJobId = Optional.of(currentJob.getId());
292       } else {
293         currentJobId = Optional.empty();
294       }
295     }
296 
297     @Override
298     public Void call() throws Exception {
299       final SecurityService securityService = getSecurityService();
300       final ServiceRegistry serviceRegistry = getServiceRegistry();
301       final Job jobBeforeProcessing = serviceRegistry.getJob(jobId);
302 
303       if (currentJobId.isPresent()) {
304         serviceRegistry.setCurrentJob(serviceRegistry.getJob(currentJobId.get()));
305       }
306 
307       final Organization organization = getOrganizationDirectoryService()
308               .getOrganization(jobBeforeProcessing.getOrganization());
309       securityService.setOrganization(organization);
310       final User user = getUserDirectoryService().loadUser(jobBeforeProcessing.getCreator());
311       securityService.setUser(user);
312 
313       try {
314         final String payload = process(jobBeforeProcessing);
315         handleSuccessfulProcessing(payload);
316       } catch (Throwable t) {
317         handleFailedProcessing(t);
318       } finally {
319         serviceRegistry.setCurrentJob(null);
320         securityService.setUser(null);
321         securityService.setOrganization(null);
322       }
323 
324       return null;
325     }
326 
327     private void handleSuccessfulProcessing(final String payload) throws Exception {
328       // The job may gets updated internally during processing. It therefore needs to be reload from the service
329       // registry in order to prevent inconsistencies.
330       final Job jobAfterProcessing = getServiceRegistry().getJob(jobId);
331       jobAfterProcessing.setPayload(payload);
332       jobAfterProcessing.setStatus(Status.FINISHED);
333       getServiceRegistry().updateJob(jobAfterProcessing);
334     }
335 
336     private void handleFailedProcessing(final Throwable t) throws Exception {
337       if (t instanceof JobCanceledException) {
338         logger.info(t.getMessage());
339       } else {
340         Job jobAfterProcessing = getServiceRegistry().getJob(jobId);
341         jobAfterProcessing.setStatus(Status.FAILED);
342         jobAfterProcessing = getServiceRegistry().updateJob(jobAfterProcessing);
343         getServiceRegistry().incident().unhandledException(jobAfterProcessing, Severity.FAILURE, t);
344         logger.error("Error handling operation '{}':", jobAfterProcessing.getOperation(), t);
345         if (t instanceof ServiceRegistryException) {
346           throw (ServiceRegistryException) t;
347         }
348       }
349     }
350 
351   }
352 }