1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.job.api;
23
24 import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
25
26 import org.opencastproject.job.api.Incident.Severity;
27 import org.opencastproject.job.api.Job.Status;
28 import org.opencastproject.security.api.Organization;
29 import org.opencastproject.security.api.OrganizationDirectoryService;
30 import org.opencastproject.security.api.SecurityService;
31 import org.opencastproject.security.api.User;
32 import org.opencastproject.security.api.UserDirectoryService;
33 import org.opencastproject.serviceregistry.api.Incidents;
34 import org.opencastproject.serviceregistry.api.ServiceRegistry;
35 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
36 import org.opencastproject.serviceregistry.api.SystemLoad.NodeLoad;
37 import org.opencastproject.serviceregistry.api.UndispatchableJobException;
38 import org.opencastproject.util.JobCanceledException;
39 import org.opencastproject.util.NotFoundException;
40
41 import org.osgi.service.component.ComponentContext;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import java.text.DecimalFormat;
46 import java.util.Optional;
47 import java.util.concurrent.Callable;
48 import java.util.concurrent.ExecutorService;
49 import java.util.concurrent.Executors;
50
51
52
53
54
55 public abstract class AbstractJobProducer implements JobProducer {
56
57
58 static final Logger logger = LoggerFactory.getLogger(AbstractJobProducer.class);
59
60
61 public static final boolean DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING = true;
62
63
64
65
66
67 public static final String ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY = "org.opencastproject.job.load.acceptexceeding";
68
69
70 private static final DecimalFormat df = new DecimalFormat("#.#");
71
72
73 protected boolean acceptJobLoadsExeedingMaxLoad = DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING;
74
75
76 protected String jobType = null;
77
78
79 protected ExecutorService executor = Executors.newCachedThreadPool();
80
81
82
83
84
85
86
87 public void activate(ComponentContext cc) {
88 acceptJobLoadsExeedingMaxLoad = getOptContextProperty(cc, ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY)
89 .map(Boolean::valueOf)
90 .orElse(DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING);
91 logger.debug("Job producer {} accepting excessively large jobs: {}", getJobType(), acceptJobLoadsExeedingMaxLoad);
92 }
93
94
95
96
97
98
99
100 public AbstractJobProducer(String jobType) {
101 this.jobType = jobType;
102 }
103
104
105
106
107
108
109 @Override
110 public String getJobType() {
111 return jobType;
112 }
113
114
115
116
117
118
119 @Override
120 public long countJobs(Status status) throws ServiceRegistryException {
121 if (status == null)
122 throw new IllegalArgumentException("Status must not be null");
123 return getServiceRegistry().count(getJobType(), status);
124 }
125
126
127
128
129
130
131 @Override
132 public void acceptJob(final Job job) throws ServiceRegistryException {
133 final Job runningJob;
134 try {
135 job.setStatus(Job.Status.RUNNING);
136 runningJob = getServiceRegistry().updateJob(job);
137 } catch (NotFoundException e) {
138 throw new IllegalStateException(e);
139 }
140 executor.submit(new JobRunner(runningJob, getServiceRegistry().getCurrentJob()));
141 }
142
143
144
145
146
147
148 @Override
149 public boolean isReadyToAcceptJobs(String operation) throws ServiceRegistryException {
150 return true;
151 }
152
153
154
155
156
157
158 @Override
159 public boolean isReadyToAccept(Job job) throws ServiceRegistryException, UndispatchableJobException {
160 if (!jobType.equals(job.getJobType())) {
161 logger.debug("Invalid job type submitted: {}", job.getJobType());
162 return false;
163 }
164 NodeLoad maxload;
165 try {
166 maxload = getServiceRegistry().getMaxLoadOnNode(getServiceRegistry().getRegistryHostname());
167 } catch (NotFoundException e) {
168 throw new ServiceRegistryException(e);
169 }
170
171
172
173 float currentLoad = getServiceRegistry().getOwnLoad();
174 logger.debug("{} Current load on this host: {}, job's load: {}, job's status: {}, max load: {}",
175 Thread.currentThread().getId(), currentLoad, job.getJobLoad(), job.getStatus().name(),
176 maxload.getMaxLoad());
177
178 currentLoad += job.getJobLoad();
179
180
181
182
183 if (job.getJobLoad() > maxload.getMaxLoad() && acceptJobLoadsExeedingMaxLoad) {
184 logger.warn(
185 "{} Accepting job {} of type {} with load {} even though load of {} is above this node's limit of {}.",
186 Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
187 df.format(currentLoad), df.format(maxload.getMaxLoad()));
188 logger.warn("This is a configuration issue that you should resolve in a production system!");
189 return true;
190 } else if (currentLoad > maxload.getMaxLoad()) {
191 logger.debug(
192 "{} Declining job {} of type {} with load {} because load of {} would exceed this node's limit of {}.",
193 Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
194 df.format(currentLoad), df.format(maxload.getMaxLoad()));
195 return false;
196 } else {
197 logger.debug("{} Accepting job {} of type {} with load {} because load of {} is within this node's limit of {}.",
198 Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
199 df.format(currentLoad), df.format(maxload.getMaxLoad()));
200 return true;
201 }
202 }
203
204
205
206
207
208
209
210
211 protected void finallyUpdateJob(Job job) {
212 if (job == null) {
213 return;
214 }
215
216 if (!Job.Status.FINISHED.equals(job.getStatus())) {
217 job.setStatus(Job.Status.FAILED);
218 }
219 try {
220 getServiceRegistry().updateJob(job);
221 } catch (Exception e) {
222 throw new RuntimeException(e);
223 }
224 }
225
226
227 public Incidents incident() {
228 return getServiceRegistry().incident();
229 }
230
231
232
233
234
235
236 protected abstract ServiceRegistry getServiceRegistry();
237
238
239
240
241
242
243 protected abstract SecurityService getSecurityService();
244
245
246
247
248
249
250 protected abstract UserDirectoryService getUserDirectoryService();
251
252
253
254
255
256
257 protected abstract OrganizationDirectoryService getOrganizationDirectoryService();
258
259
260
261
262
263
264
265
266
267
268 protected abstract String process(Job job) throws Exception;
269
270
271 class JobRunner implements Callable<Void> {
272
273
274 private final long jobId;
275
276
277 private final Optional<Long> currentJobId;
278
279
280
281
282
283
284
285
286
287 JobRunner(Job job, Job currentJob) {
288 jobId = job.getId();
289 if (currentJob != null) {
290 currentJobId = Optional.of(currentJob.getId());
291 } else {
292 currentJobId = Optional.empty();
293 }
294 }
295
296 @Override
297 public Void call() throws Exception {
298 final SecurityService securityService = getSecurityService();
299 final ServiceRegistry serviceRegistry = getServiceRegistry();
300 final Job jobBeforeProcessing = serviceRegistry.getJob(jobId);
301
302 if (currentJobId.isPresent())
303 serviceRegistry.setCurrentJob(serviceRegistry.getJob(currentJobId.get()));
304
305 final Organization organization = getOrganizationDirectoryService()
306 .getOrganization(jobBeforeProcessing.getOrganization());
307 securityService.setOrganization(organization);
308 final User user = getUserDirectoryService().loadUser(jobBeforeProcessing.getCreator());
309 securityService.setUser(user);
310
311 try {
312 final String payload = process(jobBeforeProcessing);
313 handleSuccessfulProcessing(payload);
314 } catch (Throwable t) {
315 handleFailedProcessing(t);
316 } finally {
317 serviceRegistry.setCurrentJob(null);
318 securityService.setUser(null);
319 securityService.setOrganization(null);
320 }
321
322 return null;
323 }
324
325 private void handleSuccessfulProcessing(final String payload) throws Exception {
326
327
328 final Job jobAfterProcessing = getServiceRegistry().getJob(jobId);
329 jobAfterProcessing.setPayload(payload);
330 jobAfterProcessing.setStatus(Status.FINISHED);
331 getServiceRegistry().updateJob(jobAfterProcessing);
332 }
333
334 private void handleFailedProcessing(final Throwable t) throws Exception {
335 if (t instanceof JobCanceledException) {
336 logger.info(t.getMessage());
337 } else {
338 Job jobAfterProcessing = getServiceRegistry().getJob(jobId);
339 jobAfterProcessing.setStatus(Status.FAILED);
340 jobAfterProcessing = getServiceRegistry().updateJob(jobAfterProcessing);
341 getServiceRegistry().incident().unhandledException(jobAfterProcessing, Severity.FAILURE, t);
342 logger.error("Error handling operation '{}':", jobAfterProcessing.getOperation(), t);
343 if (t instanceof ServiceRegistryException)
344 throw (ServiceRegistryException) t;
345 }
346 }
347
348 }
349 }