1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.job.api;
23
24 import static com.entwinemedia.fn.data.Opt.none;
25 import static com.entwinemedia.fn.data.Opt.some;
26 import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
27
28 import org.opencastproject.job.api.Incident.Severity;
29 import org.opencastproject.job.api.Job.Status;
30 import org.opencastproject.security.api.Organization;
31 import org.opencastproject.security.api.OrganizationDirectoryService;
32 import org.opencastproject.security.api.SecurityService;
33 import org.opencastproject.security.api.User;
34 import org.opencastproject.security.api.UserDirectoryService;
35 import org.opencastproject.serviceregistry.api.Incidents;
36 import org.opencastproject.serviceregistry.api.ServiceRegistry;
37 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
38 import org.opencastproject.serviceregistry.api.SystemLoad.NodeLoad;
39 import org.opencastproject.serviceregistry.api.UndispatchableJobException;
40 import org.opencastproject.util.JobCanceledException;
41 import org.opencastproject.util.NotFoundException;
42 import org.opencastproject.util.data.functions.Strings;
43
44 import com.entwinemedia.fn.data.Opt;
45
46 import org.osgi.service.component.ComponentContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 import java.text.DecimalFormat;
51 import java.util.concurrent.Callable;
52 import java.util.concurrent.ExecutorService;
53 import java.util.concurrent.Executors;
54
55
56
57
58
59 public abstract class AbstractJobProducer implements JobProducer {
60
61
62 static final Logger logger = LoggerFactory.getLogger(AbstractJobProducer.class);
63
64
65 public static final boolean DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING = true;
66
67
68
69
70
71 public static final String ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY = "org.opencastproject.job.load.acceptexceeding";
72
73
74 private static final DecimalFormat df = new DecimalFormat("#.#");
75
76
77 protected boolean acceptJobLoadsExeedingMaxLoad = DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING;
78
79
80 protected String jobType = null;
81
82
83 protected ExecutorService executor = Executors.newCachedThreadPool();
84
85
86
87
88
89
90
91 public void activate(ComponentContext cc) {
92 acceptJobLoadsExeedingMaxLoad = getOptContextProperty(cc, ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY).map(Strings.toBool)
93 .getOrElse(DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING);
94 logger.debug("Job producer {} accepting excessively large jobs: {}", getJobType(), acceptJobLoadsExeedingMaxLoad);
95 }
96
97
98
99
100
101
102
103 public AbstractJobProducer(String jobType) {
104 this.jobType = jobType;
105 }
106
107
108
109
110
111
112 @Override
113 public String getJobType() {
114 return jobType;
115 }
116
117
118
119
120
121
122 @Override
123 public long countJobs(Status status) throws ServiceRegistryException {
124 if (status == null)
125 throw new IllegalArgumentException("Status must not be null");
126 return getServiceRegistry().count(getJobType(), status);
127 }
128
129
130
131
132
133
134 @Override
135 public void acceptJob(final Job job) throws ServiceRegistryException {
136 final Job runningJob;
137 try {
138 job.setStatus(Job.Status.RUNNING);
139 runningJob = getServiceRegistry().updateJob(job);
140 } catch (NotFoundException e) {
141 throw new IllegalStateException(e);
142 }
143 executor.submit(new JobRunner(runningJob, getServiceRegistry().getCurrentJob()));
144 }
145
146
147
148
149
150
151 @Override
152 public boolean isReadyToAcceptJobs(String operation) throws ServiceRegistryException {
153 return true;
154 }
155
156
157
158
159
160
161 @Override
162 public boolean isReadyToAccept(Job job) throws ServiceRegistryException, UndispatchableJobException {
163 if (!jobType.equals(job.getJobType())) {
164 logger.debug("Invalid job type submitted: {}", job.getJobType());
165 return false;
166 }
167 NodeLoad maxload;
168 try {
169 maxload = getServiceRegistry().getMaxLoadOnNode(getServiceRegistry().getRegistryHostname());
170 } catch (NotFoundException e) {
171 throw new ServiceRegistryException(e);
172 }
173
174
175
176 float currentLoad = getServiceRegistry().getOwnLoad();
177 logger.debug("{} Current load on this host: {}, job's load: {}, job's status: {}, max load: {}",
178 Thread.currentThread().getId(), currentLoad, job.getJobLoad(), job.getStatus().name(),
179 maxload.getMaxLoad());
180
181 currentLoad += job.getJobLoad();
182
183
184
185
186 if (job.getJobLoad() > maxload.getMaxLoad() && acceptJobLoadsExeedingMaxLoad) {
187 logger.warn(
188 "{} Accepting job {} of type {} with load {} even though load of {} is above this node's limit of {}.",
189 Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
190 df.format(currentLoad), df.format(maxload.getMaxLoad()));
191 logger.warn("This is a configuration issue that you should resolve in a production system!");
192 return true;
193 } else if (currentLoad > maxload.getMaxLoad()) {
194 logger.debug(
195 "{} Declining job {} of type {} with load {} because load of {} would exceed this node's limit of {}.",
196 Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
197 df.format(currentLoad), df.format(maxload.getMaxLoad()));
198 return false;
199 } else {
200 logger.debug("{} Accepting job {} of type {} with load {} because load of {} is within this node's limit of {}.",
201 Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
202 df.format(currentLoad), df.format(maxload.getMaxLoad()));
203 return true;
204 }
205 }
206
207
208
209
210
211
212
213
214 protected void finallyUpdateJob(Job job) {
215 if (job == null) {
216 return;
217 }
218
219 if (!Job.Status.FINISHED.equals(job.getStatus())) {
220 job.setStatus(Job.Status.FAILED);
221 }
222 try {
223 getServiceRegistry().updateJob(job);
224 } catch (Exception e) {
225 throw new RuntimeException(e);
226 }
227 }
228
229
230 public Incidents incident() {
231 return getServiceRegistry().incident();
232 }
233
234
235
236
237
238
239 protected abstract ServiceRegistry getServiceRegistry();
240
241
242
243
244
245
246 protected abstract SecurityService getSecurityService();
247
248
249
250
251
252
253 protected abstract UserDirectoryService getUserDirectoryService();
254
255
256
257
258
259
260 protected abstract OrganizationDirectoryService getOrganizationDirectoryService();
261
262
263
264
265
266
267
268
269
270
271 protected abstract String process(Job job) throws Exception;
272
273
274 class JobRunner implements Callable<Void> {
275
276
277 private final long jobId;
278
279
280 private final Opt<Long> currentJobId;
281
282
283
284
285
286
287
288
289
290 JobRunner(Job job, Job currentJob) {
291 jobId = job.getId();
292 if (currentJob != null) {
293 currentJobId = some(currentJob.getId());
294 } else {
295 currentJobId = none();
296 }
297 }
298
299 @Override
300 public Void call() throws Exception {
301 final SecurityService securityService = getSecurityService();
302 final ServiceRegistry serviceRegistry = getServiceRegistry();
303 final Job jobBeforeProcessing = serviceRegistry.getJob(jobId);
304
305 if (currentJobId.isSome())
306 serviceRegistry.setCurrentJob(serviceRegistry.getJob(currentJobId.get()));
307
308 final Organization organization = getOrganizationDirectoryService()
309 .getOrganization(jobBeforeProcessing.getOrganization());
310 securityService.setOrganization(organization);
311 final User user = getUserDirectoryService().loadUser(jobBeforeProcessing.getCreator());
312 securityService.setUser(user);
313
314 try {
315 final String payload = process(jobBeforeProcessing);
316 handleSuccessfulProcessing(payload);
317 } catch (Throwable t) {
318 handleFailedProcessing(t);
319 } finally {
320 serviceRegistry.setCurrentJob(null);
321 securityService.setUser(null);
322 securityService.setOrganization(null);
323 }
324
325 return null;
326 }
327
328 private void handleSuccessfulProcessing(final String payload) throws Exception {
329
330
331 final Job jobAfterProcessing = getServiceRegistry().getJob(jobId);
332 jobAfterProcessing.setPayload(payload);
333 jobAfterProcessing.setStatus(Status.FINISHED);
334 getServiceRegistry().updateJob(jobAfterProcessing);
335 }
336
337 private void handleFailedProcessing(final Throwable t) throws Exception {
338 if (t instanceof JobCanceledException) {
339 logger.info(t.getMessage());
340 } else {
341 Job jobAfterProcessing = getServiceRegistry().getJob(jobId);
342 jobAfterProcessing.setStatus(Status.FAILED);
343 jobAfterProcessing = getServiceRegistry().updateJob(jobAfterProcessing);
344 getServiceRegistry().incident().unhandledException(jobAfterProcessing, Severity.FAILURE, t);
345 logger.error("Error handling operation '{}':", jobAfterProcessing.getOperation(), t);
346 if (t instanceof ServiceRegistryException)
347 throw (ServiceRegistryException) t;
348 }
349 }
350
351 }
352 }