1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.job.api;
23
24 import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
25
26 import org.opencastproject.job.api.Incident.Severity;
27 import org.opencastproject.job.api.Job.Status;
28 import org.opencastproject.security.api.Organization;
29 import org.opencastproject.security.api.OrganizationDirectoryService;
30 import org.opencastproject.security.api.SecurityService;
31 import org.opencastproject.security.api.User;
32 import org.opencastproject.security.api.UserDirectoryService;
33 import org.opencastproject.serviceregistry.api.Incidents;
34 import org.opencastproject.serviceregistry.api.ServiceRegistry;
35 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
36 import org.opencastproject.serviceregistry.api.SystemLoad.NodeLoad;
37 import org.opencastproject.serviceregistry.api.UndispatchableJobException;
38 import org.opencastproject.util.JobCanceledException;
39 import org.opencastproject.util.NotFoundException;
40
41 import org.osgi.service.component.ComponentContext;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import java.text.DecimalFormat;
46 import java.util.Optional;
47 import java.util.concurrent.Callable;
48 import java.util.concurrent.ExecutorService;
49 import java.util.concurrent.Executors;
50
51
52
53
54
55 public abstract class AbstractJobProducer implements JobProducer {
56
57
58 static final Logger logger = LoggerFactory.getLogger(AbstractJobProducer.class);
59
60
61 public static final boolean DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING = true;
62
63
64
65
66
67 public static final String ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY = "org.opencastproject.job.load.acceptexceeding";
68
69
70 private static final DecimalFormat df = new DecimalFormat("#.#");
71
72
73 protected boolean acceptJobLoadsExeedingMaxLoad = DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING;
74
75
76 protected String jobType = null;
77
78
79 protected ExecutorService executor = Executors.newCachedThreadPool();
80
81
82
83
84
85
86
87 public void activate(ComponentContext cc) {
88 acceptJobLoadsExeedingMaxLoad = getOptContextProperty(cc, ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY)
89 .map(Boolean::valueOf)
90 .orElse(DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING);
91 logger.debug("Job producer {} accepting excessively large jobs: {}", getJobType(), acceptJobLoadsExeedingMaxLoad);
92 }
93
94
95
96
97
98
99
100 public AbstractJobProducer(String jobType) {
101 this.jobType = jobType;
102 }
103
104
105
106
107
108
109 @Override
110 public String getJobType() {
111 return jobType;
112 }
113
114
115
116
117
118
119 @Override
120 public long countJobs(Status status) throws ServiceRegistryException {
121 if (status == null) {
122 throw new IllegalArgumentException("Status must not be null");
123 }
124 return getServiceRegistry().count(getJobType(), status);
125 }
126
127
128
129
130
131
132 @Override
133 public void acceptJob(final Job job) throws ServiceRegistryException {
134 final Job runningJob;
135 try {
136 job.setStatus(Job.Status.RUNNING);
137 runningJob = getServiceRegistry().updateJob(job);
138 } catch (NotFoundException e) {
139 throw new IllegalStateException(e);
140 }
141 executor.submit(new JobRunner(runningJob, getServiceRegistry().getCurrentJob()));
142 }
143
144
145
146
147
148
149 @Override
150 public boolean isReadyToAcceptJobs(String operation) throws ServiceRegistryException {
151 return true;
152 }
153
154
155
156
157
158
159 @Override
160 public boolean isReadyToAccept(Job job) throws ServiceRegistryException, UndispatchableJobException {
161 if (!jobType.equals(job.getJobType())) {
162 logger.debug("Invalid job type submitted: {}", job.getJobType());
163 return false;
164 }
165 NodeLoad maxload;
166 try {
167 maxload = getServiceRegistry().getMaxLoadOnNode(getServiceRegistry().getRegistryHostname());
168 } catch (NotFoundException e) {
169 throw new ServiceRegistryException(e);
170 }
171
172
173
174 float currentLoad = getServiceRegistry().getOwnLoad();
175 logger.debug("{} Current load on this host: {}, job's load: {}, job's status: {}, max load: {}",
176 Thread.currentThread().getId(), currentLoad, job.getJobLoad(), job.getStatus().name(),
177 maxload.getMaxLoad());
178
179 currentLoad += job.getJobLoad();
180
181
182
183
184 if (job.getJobLoad() > maxload.getMaxLoad() && acceptJobLoadsExeedingMaxLoad) {
185 logger.warn(
186 "{} Accepting job {} of type {} with load {} even though load of {} is above this node's limit of {}.",
187 Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
188 df.format(currentLoad), df.format(maxload.getMaxLoad()));
189 logger.warn("This is a configuration issue that you should resolve in a production system!");
190 return true;
191 } else if (currentLoad > maxload.getMaxLoad()) {
192 logger.debug(
193 "{} Declining job {} of type {} with load {} because load of {} would exceed this node's limit of {}.",
194 Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
195 df.format(currentLoad), df.format(maxload.getMaxLoad()));
196 return false;
197 } else {
198 logger.debug("{} Accepting job {} of type {} with load {} because load of {} is within this node's limit of {}.",
199 Thread.currentThread().getId(), job.getId(), job.getJobType(), df.format(job.getJobLoad()),
200 df.format(currentLoad), df.format(maxload.getMaxLoad()));
201 return true;
202 }
203 }
204
205
206
207
208
209
210
211
212 protected void finallyUpdateJob(Job job) {
213 if (job == null) {
214 return;
215 }
216
217 if (!Job.Status.FINISHED.equals(job.getStatus())) {
218 job.setStatus(Job.Status.FAILED);
219 }
220 try {
221 getServiceRegistry().updateJob(job);
222 } catch (Exception e) {
223 throw new RuntimeException(e);
224 }
225 }
226
227
228 public Incidents incident() {
229 return getServiceRegistry().incident();
230 }
231
232
233
234
235
236
237 protected abstract ServiceRegistry getServiceRegistry();
238
239
240
241
242
243
244 protected abstract SecurityService getSecurityService();
245
246
247
248
249
250
251 protected abstract UserDirectoryService getUserDirectoryService();
252
253
254
255
256
257
258 protected abstract OrganizationDirectoryService getOrganizationDirectoryService();
259
260
261
262
263
264
265
266
267
268
269 protected abstract String process(Job job) throws Exception;
270
271
272 class JobRunner implements Callable<Void> {
273
274
275 private final long jobId;
276
277
278 private final Optional<Long> currentJobId;
279
280
281
282
283
284
285
286
287
288 JobRunner(Job job, Job currentJob) {
289 jobId = job.getId();
290 if (currentJob != null) {
291 currentJobId = Optional.of(currentJob.getId());
292 } else {
293 currentJobId = Optional.empty();
294 }
295 }
296
297 @Override
298 public Void call() throws Exception {
299 final SecurityService securityService = getSecurityService();
300 final ServiceRegistry serviceRegistry = getServiceRegistry();
301 final Job jobBeforeProcessing = serviceRegistry.getJob(jobId);
302
303 if (currentJobId.isPresent()) {
304 serviceRegistry.setCurrentJob(serviceRegistry.getJob(currentJobId.get()));
305 }
306
307 final Organization organization = getOrganizationDirectoryService()
308 .getOrganization(jobBeforeProcessing.getOrganization());
309 securityService.setOrganization(organization);
310 final User user = getUserDirectoryService().loadUser(jobBeforeProcessing.getCreator());
311 securityService.setUser(user);
312
313 try {
314 final String payload = process(jobBeforeProcessing);
315 handleSuccessfulProcessing(payload);
316 } catch (Throwable t) {
317 handleFailedProcessing(t);
318 } finally {
319 serviceRegistry.setCurrentJob(null);
320 securityService.setUser(null);
321 securityService.setOrganization(null);
322 }
323
324 return null;
325 }
326
327 private void handleSuccessfulProcessing(final String payload) throws Exception {
328
329
330 final Job jobAfterProcessing = getServiceRegistry().getJob(jobId);
331 jobAfterProcessing.setPayload(payload);
332 jobAfterProcessing.setStatus(Status.FINISHED);
333 getServiceRegistry().updateJob(jobAfterProcessing);
334 }
335
336 private void handleFailedProcessing(final Throwable t) throws Exception {
337 if (t instanceof JobCanceledException) {
338 logger.info(t.getMessage());
339 } else {
340 Job jobAfterProcessing = getServiceRegistry().getJob(jobId);
341 jobAfterProcessing.setStatus(Status.FAILED);
342 jobAfterProcessing = getServiceRegistry().updateJob(jobAfterProcessing);
343 getServiceRegistry().incident().unhandledException(jobAfterProcessing, Severity.FAILURE, t);
344 logger.error("Error handling operation '{}':", jobAfterProcessing.getOperation(), t);
345 if (t instanceof ServiceRegistryException) {
346 throw (ServiceRegistryException) t;
347 }
348 }
349 }
350
351 }
352 }