View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.job.api;
23  
24  import static com.entwinemedia.fn.data.Opt.none;
25  import static com.entwinemedia.fn.data.Opt.some;
26  import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
27  
28  import org.opencastproject.job.api.Incident.Severity;
29  import org.opencastproject.job.api.Job.Status;
30  import org.opencastproject.security.api.Organization;
31  import org.opencastproject.security.api.OrganizationDirectoryService;
32  import org.opencastproject.security.api.SecurityService;
33  import org.opencastproject.security.api.User;
34  import org.opencastproject.security.api.UserDirectoryService;
35  import org.opencastproject.serviceregistry.api.Incidents;
36  import org.opencastproject.serviceregistry.api.ServiceRegistry;
37  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
38  import org.opencastproject.serviceregistry.api.SystemLoad.NodeLoad;
39  import org.opencastproject.serviceregistry.api.UndispatchableJobException;
40  import org.opencastproject.util.JobCanceledException;
41  import org.opencastproject.util.NotFoundException;
42  import org.opencastproject.util.data.functions.Strings;
43  
44  import com.entwinemedia.fn.data.Opt;
45  
46  import org.osgi.service.component.ComponentContext;
47  import org.slf4j.Logger;
48  import org.slf4j.LoggerFactory;
49  
50  import java.text.DecimalFormat;
51  import java.util.concurrent.Callable;
52  import java.util.concurrent.ExecutorService;
53  import java.util.concurrent.Executors;
54  
55  /**
56   * This class serves as a convenience for services that implement the {@link JobProducer} api to deal with handling long
57   * running, asynchronous operations.
58   */
59  public abstract class AbstractJobProducer implements JobProducer {
60  
61    /** The logger */
62    static final Logger logger = LoggerFactory.getLogger(AbstractJobProducer.class);
63  
64    /** The default value whether to accept a job whose load exceeds the host’s max load */
65    public static final boolean DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING = true;
66  
67    /**
68     * The key to look for in the service configuration file to override the
69     * {@link AbstractJobProducer#DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING}
70     */
71    public static final String ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY = "org.opencastproject.job.load.acceptexceeding";
72  
73    /** The formatter for load values */
74    private static final DecimalFormat df = new DecimalFormat("#.#");
75  
76    /** Whether to accept a job whose load exceeds the host’s max load */
77    protected boolean acceptJobLoadsExeedingMaxLoad = DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING;
78  
79    /** The types of job that this producer can handle */
80    protected String jobType = null;
81  
82    /** To enable threading when dispatching jobs */
83    protected ExecutorService executor = Executors.newCachedThreadPool();
84  
85    /**
86     * OSGI activate method.
87     *
88     * @param cc
89     *          OSGI component context
90     **/
91    public void activate(ComponentContext cc) {
92      acceptJobLoadsExeedingMaxLoad = getOptContextProperty(cc, ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY).map(Strings.toBool)
93              .getOrElse(DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING);
94      logger.debug("Job producer {} accepting excessively large jobs: {}", getJobType(), acceptJobLoadsExeedingMaxLoad);
95    }
96  
97    /**
98     * Creates a new abstract job producer for jobs of the given type.
99     *
100    * @param jobType
101    *          the job type
102    */
103   public AbstractJobProducer(String jobType) {
104     this.jobType = jobType;
105   }
106 
107   /**
108    * {@inheritDoc}
109    *
110    * @see org.opencastproject.job.api.JobProducer#getJobType()
111    */
112   @Override
113   public String getJobType() {
114     return jobType;
115   }
116 
117   /**
118    * {@inheritDoc}
119    *
120    * @see org.opencastproject.job.api.JobProducer#countJobs(org.opencastproject.job.api.Job.Status)
121    */
122   @Override
123   public long countJobs(Status status) throws ServiceRegistryException {
124     if (status == null)
125       throw new IllegalArgumentException("Status must not be null");
126     return getServiceRegistry().count(getJobType(), status);
127   }
128 
129   /**
130    * {@inheritDoc}
131    *
132    * @see org.opencastproject.job.api.JobProducer#acceptJob(org.opencastproject.job.api.Job)
133    */
134   @Override
135   public void acceptJob(final Job job) throws ServiceRegistryException {
136     final Job runningJob;
137     try {
138       job.setStatus(Job.Status.RUNNING);
139       runningJob = getServiceRegistry().updateJob(job);
140     } catch (NotFoundException e) {
141       throw new IllegalStateException(e);
142     }
143     executor.submit(new JobRunner(runningJob, getServiceRegistry().getCurrentJob()));
144   }
145 
146   /**
147    * {@inheritDoc}
148    *
149    * @see org.opencastproject.job.api.JobProducer#isReadyToAcceptJobs(String)
150    */
151   @Override
152   public boolean isReadyToAcceptJobs(String operation) throws ServiceRegistryException {
153     return true;
154   }
155 
156   /**
157    * {@inheritDoc}
158    *
159    * @see org.opencastproject.job.api.JobProducer#isReadyToAccept(org.opencastproject.job.api.Job)
160    */
161   @Override
162   public boolean isReadyToAccept(Job job) throws ServiceRegistryException, UndispatchableJobException {
163     if (!jobType.equals(job.getJobType())) {
164       logger.debug("Invalid job type submitted: {}", job.getJobType());
165       return false;
166     }
167     NodeLoad maxload;
168     try {
169       maxload = getServiceRegistry().getMaxLoadOnNode(getServiceRegistry().getRegistryHostname());
170     } catch (NotFoundException e) {
171       throw new ServiceRegistryException(e);
172     }
173 
174     // Note: We are not adding the job load in the next line because it is already accounted for in the load values we
175     // get back from the service registry.
176     float currentLoad = getServiceRegistry().getOwnLoad();
177     logger.debug("{} Current load on this host: {}, job's load: {}, job's status: {}, max load: {}",
178             Thread.currentThread().getId(), currentLoad, job.getJobLoad(), job.getStatus().name(),
179             maxload.getMaxLoad());
180     // Add the current job load to compare below
181     currentLoad += job.getJobLoad();
182 
183     /* Note that this first clause looks at the *job's*, the other two look at the *node's* load
184      * We're assuming that if this case is true, then we're also the most powerful node in the system for this service,
185      * per the current job dispatching code in ServiceRegistryJpaImpl */
186     if (job.getJobLoad() > maxload.getMaxLoad() && acceptJobLoadsExeedingMaxLoad) {
187       logger.warn(
188               "{} Accepting job {} of type {} with load {} even though load of {} is above this node's limit of {}.",
189               Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
190               df.format(currentLoad), df.format(maxload.getMaxLoad()));
191       logger.warn("This is a configuration issue that you should resolve in a production system!");
192       return true;
193     } else if (currentLoad > maxload.getMaxLoad()) {
194       logger.debug(
195               "{} Declining job {} of type {} with load {} because load of {} would exceed this node's limit of {}.",
196               Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
197               df.format(currentLoad), df.format(maxload.getMaxLoad()));
198       return false;
199     } else  {
200       logger.debug("{} Accepting job {} of type {} with load {} because load of {} is within this node's limit of {}.",
201               Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
202               df.format(currentLoad), df.format(maxload.getMaxLoad()));
203       return true;
204     }
205   }
206 
207 
208   /**
209    * Private utility to update and optionally fail job, called from a finally block.
210    *
211    * @param job
212    *          to be updated, may be null
213    */
214   protected void finallyUpdateJob(Job job)  {
215     if (job == null) {
216       return;
217     }
218 
219     if (!Job.Status.FINISHED.equals(job.getStatus())) {
220       job.setStatus(Job.Status.FAILED);
221     }
222     try {
223       getServiceRegistry().updateJob(job);
224     } catch (Exception e) {
225       throw new RuntimeException(e);
226     }
227   }
228 
229   /** Shorthand for {@link #getServiceRegistry()}.incident() */
230   public Incidents incident() {
231     return getServiceRegistry().incident();
232   }
233 
234   /**
235    * Returns a reference to the service registry.
236    *
237    * @return the service registry
238    */
239   protected abstract ServiceRegistry getServiceRegistry();
240 
241   /**
242    * Returns a reference to the security service
243    *
244    * @return the security service
245    */
246   protected abstract SecurityService getSecurityService();
247 
248   /**
249    * Returns a reference to the user directory service
250    *
251    * @return the user directory service
252    */
253   protected abstract UserDirectoryService getUserDirectoryService();
254 
255   /**
256    * Returns a reference to the organization directory service.
257    *
258    * @return the organization directory service
259    */
260   protected abstract OrganizationDirectoryService getOrganizationDirectoryService();
261 
262   /**
263    * Asks the overriding class to process the arguments using the given operation. The result will be added to the
264    * associated job as the payload.
265    *
266    * @param job
267    *          the job to process
268    * @return the operation result
269    * @throws Exception
270    */
271   protected abstract String process(Job job) throws Exception;
272 
273   /** A utility class to run jobs */
274   class JobRunner implements Callable<Void> {
275 
276     /** The job to dispatch */
277     private final long jobId;
278 
279     /** The current job */
280     private final Opt<Long> currentJobId;
281 
282     /**
283      * Constructs a new job runner
284      *
285      * @param job
286      *          the job to run
287      * @param currentJob
288      *          the current running job
289      */
290     JobRunner(Job job, Job currentJob) {
291       jobId = job.getId();
292       if (currentJob != null) {
293         currentJobId = some(currentJob.getId());
294       } else {
295         currentJobId = none();
296       }
297     }
298 
299     @Override
300     public Void call() throws Exception {
301       final SecurityService securityService = getSecurityService();
302       final ServiceRegistry serviceRegistry = getServiceRegistry();
303       final Job jobBeforeProcessing = serviceRegistry.getJob(jobId);
304 
305       if (currentJobId.isSome())
306         serviceRegistry.setCurrentJob(serviceRegistry.getJob(currentJobId.get()));
307 
308       final Organization organization = getOrganizationDirectoryService()
309               .getOrganization(jobBeforeProcessing.getOrganization());
310       securityService.setOrganization(organization);
311       final User user = getUserDirectoryService().loadUser(jobBeforeProcessing.getCreator());
312       securityService.setUser(user);
313 
314       try {
315         final String payload = process(jobBeforeProcessing);
316         handleSuccessfulProcessing(payload);
317       } catch (Throwable t) {
318         handleFailedProcessing(t);
319       } finally {
320         serviceRegistry.setCurrentJob(null);
321         securityService.setUser(null);
322         securityService.setOrganization(null);
323       }
324 
325       return null;
326     }
327 
328     private void handleSuccessfulProcessing(final String payload) throws Exception {
329       // The job may gets updated internally during processing. It therefore needs to be reload from the service
330       // registry in order to prevent inconsistencies.
331       final Job jobAfterProcessing = getServiceRegistry().getJob(jobId);
332       jobAfterProcessing.setPayload(payload);
333       jobAfterProcessing.setStatus(Status.FINISHED);
334       getServiceRegistry().updateJob(jobAfterProcessing);
335     }
336 
337     private void handleFailedProcessing(final Throwable t) throws Exception {
338       if (t instanceof JobCanceledException) {
339         logger.info(t.getMessage());
340       } else {
341         Job jobAfterProcessing = getServiceRegistry().getJob(jobId);
342         jobAfterProcessing.setStatus(Status.FAILED);
343         jobAfterProcessing = getServiceRegistry().updateJob(jobAfterProcessing);
344         getServiceRegistry().incident().unhandledException(jobAfterProcessing, Severity.FAILURE, t);
345         logger.error("Error handling operation '{}':", jobAfterProcessing.getOperation(), t);
346         if (t instanceof ServiceRegistryException)
347           throw (ServiceRegistryException) t;
348       }
349     }
350 
351   }
352 }