1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.remote;
23
24 import static org.apache.http.HttpStatus.SC_CONFLICT;
25 import static org.apache.http.HttpStatus.SC_NOT_FOUND;
26 import static org.apache.http.HttpStatus.SC_NO_CONTENT;
27 import static org.apache.http.HttpStatus.SC_OK;
28 import static org.apache.http.HttpStatus.SC_UNAUTHORIZED;
29
30 import org.opencastproject.mediapackage.MediaPackage;
31 import org.opencastproject.mediapackage.MediaPackageParser;
32 import org.opencastproject.security.api.TrustedHttpClient;
33 import org.opencastproject.security.api.UnauthorizedException;
34 import org.opencastproject.serviceregistry.api.RemoteBase;
35 import org.opencastproject.serviceregistry.api.ServiceRegistry;
36 import org.opencastproject.util.NotFoundException;
37 import org.opencastproject.workflow.api.WorkflowDatabaseException;
38 import org.opencastproject.workflow.api.WorkflowDefinition;
39 import org.opencastproject.workflow.api.WorkflowException;
40 import org.opencastproject.workflow.api.WorkflowInstance;
41 import org.opencastproject.workflow.api.WorkflowInstance.WorkflowState;
42 import org.opencastproject.workflow.api.WorkflowListener;
43 import org.opencastproject.workflow.api.WorkflowService;
44 import org.opencastproject.workflow.api.XmlWorkflowParser;
45
46 import org.apache.commons.io.IOUtils;
47 import org.apache.http.HttpResponse;
48 import org.apache.http.HttpStatus;
49 import org.apache.http.NameValuePair;
50 import org.apache.http.ParseException;
51 import org.apache.http.client.entity.UrlEncodedFormEntity;
52 import org.apache.http.client.methods.HttpDelete;
53 import org.apache.http.client.methods.HttpGet;
54 import org.apache.http.client.methods.HttpPost;
55 import org.apache.http.client.utils.URLEncodedUtils;
56 import org.apache.http.message.BasicNameValuePair;
57 import org.apache.http.util.EntityUtils;
58 import org.json.simple.parser.JSONParser;
59 import org.osgi.service.component.annotations.Component;
60 import org.osgi.service.component.annotations.Reference;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 import java.io.IOException;
65 import java.io.UnsupportedEncodingException;
66 import java.nio.charset.StandardCharsets;
67 import java.util.ArrayList;
68 import java.util.Collections;
69 import java.util.HashMap;
70 import java.util.List;
71 import java.util.Map;
72 import java.util.Map.Entry;
73 import java.util.Optional;
74
75
76
77
78 @Component(
79 property = {
80 "service.description=Workflow Remote Service Proxy"
81 },
82 immediate = true,
83 service = { WorkflowService.class }
84 )
85 public class WorkflowServiceRemoteImpl extends RemoteBase implements WorkflowService {
86
87
88 private static final Logger logger = LoggerFactory.getLogger(WorkflowServiceRemoteImpl.class);
89
90 public WorkflowServiceRemoteImpl() {
91 super(JOB_TYPE);
92 }
93
94
95
96
97
98
99 @Override
100 @Reference
101 public void setTrustedHttpClient(TrustedHttpClient client) {
102 super.setTrustedHttpClient(client);
103 }
104
105
106
107
108
109
110 @Override
111 @Reference
112 public void setRemoteServiceManager(ServiceRegistry remoteServiceManager) {
113 this.remoteServiceManager = remoteServiceManager;
114 }
115
116
117
118
119
120
121 @Override
122 public WorkflowDefinition getWorkflowDefinitionById(String id) throws WorkflowDatabaseException, NotFoundException {
123 HttpGet get = new HttpGet("/definition/" + id + ".xml");
124 HttpResponse response = getResponse(get, SC_NOT_FOUND, SC_OK);
125 try {
126 if (response != null) {
127 if (SC_NOT_FOUND == response.getStatusLine().getStatusCode()) {
128 throw new NotFoundException("Workflow definition " + id + " does not exist.");
129 } else {
130 return XmlWorkflowParser.parseWorkflowDefinition(response.getEntity().getContent());
131 }
132 }
133 } catch (NotFoundException e) {
134 throw e;
135 } catch (Exception e) {
136 throw new WorkflowDatabaseException(e);
137 } finally {
138 closeConnection(response);
139 }
140 throw new WorkflowDatabaseException("Unable to connect to a remote workflow service");
141 }
142
143
144
145
146
147
148 @Override
149 public WorkflowInstance getWorkflowById(long id) throws WorkflowDatabaseException, NotFoundException {
150 HttpGet get = new HttpGet("/instance/" + id + ".xml");
151 HttpResponse response = getResponse(get, SC_NOT_FOUND, SC_OK);
152 try {
153 if (response != null) {
154 if (SC_NOT_FOUND == response.getStatusLine().getStatusCode()) {
155 throw new NotFoundException("Workflow instance " + id + " does not exist.");
156 } else {
157 return XmlWorkflowParser.parseWorkflowInstance(response.getEntity().getContent());
158 }
159 }
160 } catch (NotFoundException e) {
161 throw e;
162 } catch (Exception e) {
163 throw new WorkflowDatabaseException(e);
164 } finally {
165 closeConnection(response);
166 }
167 throw new WorkflowDatabaseException("Unable to connect to a remote workflow service");
168 }
169
170 @Override
171 public List<WorkflowInstance> getWorkflowInstancesByMediaPackage(String mediaPackageId)
172 throws WorkflowDatabaseException {
173 HttpGet get = new HttpGet("/mediaPackage/" + mediaPackageId + "/instances.xml");
174 HttpResponse response = getResponse(get);
175 try {
176 if (response != null)
177 return XmlWorkflowParser.parseWorkflowSet(response.getEntity().getContent()).getItems();
178 } catch (Exception e) {
179 throw new WorkflowDatabaseException(e);
180 } finally {
181 closeConnection(response);
182 }
183 throw new WorkflowDatabaseException("Workflow instances can not be loaded from a remote workflow service");
184 }
185
186 @Override
187 public Optional<WorkflowInstance> getRunningWorkflowInstanceByMediaPackage(String mediaPackageId, String action)
188 throws WorkflowException {
189
190 HttpGet get = new HttpGet("/mediaPackage/" + mediaPackageId + "/instances.xml");
191 HttpResponse response = getResponse(get, SC_NOT_FOUND, SC_OK);
192 try {
193 if (response != null) {
194 if (SC_NOT_FOUND == response.getStatusLine().getStatusCode()) {
195 return Optional.empty();
196 }
197 return Optional.ofNullable(
198 XmlWorkflowParser.parseWorkflowInstance(response.getEntity().getContent())
199 );
200 }
201 } catch (Exception e) {
202 throw new WorkflowDatabaseException(e);
203 } finally {
204 closeConnection(response);
205 }
206 throw new WorkflowDatabaseException("Workflow instances can not be loaded from a remote workflow service");
207 }
208
209 @Override
210 public boolean mediaPackageHasActiveWorkflows(String mediaPackageId) throws WorkflowDatabaseException {
211 HttpGet get = new HttpGet("/mediaPackage/" + mediaPackageId + "/hasActiveWorkflows");
212 HttpResponse response = getResponse(get);
213 try {
214 if (response != null)
215 return Boolean.parseBoolean(response.getEntity().getContent().toString());
216 } catch (Exception e) {
217 throw new WorkflowDatabaseException(e);
218 } finally {
219 closeConnection(response);
220 }
221 throw new WorkflowDatabaseException("Workflow instances can not be loaded from a remote workflow service");
222 }
223
224 @Override
225 public boolean userHasActiveWorkflows(String userId) throws WorkflowDatabaseException {
226 HttpGet get = new HttpGet("/user/" + userId + "/hasActiveWorkflows");
227 HttpResponse response = getResponse(get);
228 try {
229 if (response != null)
230 return Boolean.parseBoolean(response.getEntity().getContent().toString());
231 } catch (Exception e) {
232 throw new WorkflowDatabaseException(e);
233 } finally {
234 closeConnection(response);
235 }
236 throw new WorkflowDatabaseException("Workflow instances can not be loaded from a remote workflow service");
237 }
238
239
240
241
242
243
244
245 @Override
246 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage,
247 Map<String, String> properties) throws WorkflowDatabaseException {
248 try {
249 return start(workflowDefinition, mediaPackage, null, properties);
250 } catch (NotFoundException e) {
251 throw new IllegalStateException("A null parent workflow id should never result in a not found exception ", e);
252 }
253 }
254
255
256
257
258
259
260
261
262
263 private String mapToString(Map<String, String> props) {
264 StringBuilder sb = new StringBuilder();
265 for (Entry<String, String> entry : props.entrySet()) {
266 sb.append(entry.getKey());
267 sb.append("=");
268 sb.append(entry.getValue());
269 sb.append("\n");
270 }
271 return sb.toString();
272 }
273
274
275
276
277
278
279
280 @Override
281 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage,
282 Long parentWorkflowId, Map<String, String> properties) throws WorkflowDatabaseException, NotFoundException {
283 HttpPost post = new HttpPost("/start");
284 try {
285 List<BasicNameValuePair> params = new ArrayList<>();
286 if (workflowDefinition != null)
287 params.add(new BasicNameValuePair("definition", XmlWorkflowParser.toXml(workflowDefinition)));
288 params.add(new BasicNameValuePair("mediapackage", MediaPackageParser.getAsXml(mediaPackage)));
289 if (parentWorkflowId != null)
290 params.add(new BasicNameValuePair("parent", parentWorkflowId.toString()));
291 if (properties != null)
292 params.add(new BasicNameValuePair("properties", mapToString(properties)));
293 post.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
294 } catch (Exception e) {
295 throw new IllegalStateException("Unable to assemble a remote workflow request", e);
296 }
297 HttpResponse response = getResponse(post, SC_NOT_FOUND, SC_OK);
298 try {
299 if (response != null) {
300 if (SC_NOT_FOUND == response.getStatusLine().getStatusCode()) {
301 throw new NotFoundException("Workflow instance " + parentWorkflowId + " does not exist.");
302 } else {
303 return XmlWorkflowParser.parseWorkflowInstance(response.getEntity().getContent());
304 }
305 }
306 } catch (NotFoundException e) {
307 throw e;
308 } catch (Exception e) {
309 throw new WorkflowDatabaseException("Unable to build a workflow from xml", e);
310 } finally {
311 closeConnection(response);
312 }
313 throw new WorkflowDatabaseException("Unable to start a remote workflow. The http response code was unexpected.");
314 }
315
316
317
318
319
320
321
322 @Override
323 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage)
324 throws WorkflowDatabaseException {
325 try {
326 return start(workflowDefinition, mediaPackage, null, null);
327 } catch (NotFoundException e) {
328 throw new IllegalStateException("A null parent workflow id should never result in a not found exception ", e);
329 }
330 }
331
332
333
334
335
336
337 @Override
338 public long countWorkflowInstances() throws WorkflowDatabaseException {
339 return countWorkflowInstances(null);
340 }
341
342
343
344
345
346
347 @Override
348 public long countWorkflowInstances(WorkflowState state) throws WorkflowDatabaseException {
349 List<NameValuePair> queryStringParams = new ArrayList<>();
350 if (state != null)
351 queryStringParams.add(new BasicNameValuePair("state", state.toString()));
352
353 StringBuilder url = new StringBuilder("/count");
354 if (queryStringParams.size() > 0) {
355 url.append("?");
356 url.append(URLEncodedUtils.format(queryStringParams, "UTF-8"));
357 }
358
359 HttpGet get = new HttpGet(url.toString());
360 HttpResponse response = getResponse(get);
361 try {
362 if (response != null) {
363 String body = null;
364 try {
365 body = EntityUtils.toString(response.getEntity());
366 return Long.parseLong(body);
367 } catch (NumberFormatException e) {
368 throw new WorkflowDatabaseException("Unable to parse the response body as a long: " + body);
369 }
370 }
371 } catch (ParseException | IOException e) {
372 throw new WorkflowDatabaseException("Unable to parse the response body");
373 } finally {
374 closeConnection(response);
375 }
376
377 throw new WorkflowDatabaseException("Unable to count workflow instances");
378 }
379
380
381
382
383
384
385 @Override
386 public WorkflowInstance stop(long workflowInstanceId) throws WorkflowDatabaseException, NotFoundException {
387 HttpPost post = new HttpPost("/stop");
388 List<BasicNameValuePair> params = new ArrayList<>();
389 params.add(new BasicNameValuePair("id", Long.toString(workflowInstanceId)));
390 try {
391 post.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
392 } catch (UnsupportedEncodingException e) {
393 throw new IllegalStateException("Unable to assemble a remote workflow service request", e);
394 }
395 HttpResponse response = getResponse(post, SC_OK, SC_NOT_FOUND);
396 try {
397 if (response != null) {
398 if (response.getStatusLine().getStatusCode() == SC_NOT_FOUND) {
399 throw new NotFoundException("Workflow instance with id='" + workflowInstanceId + "' not found");
400 } else {
401 logger.info("Workflow '{}' stopped", workflowInstanceId);
402 return XmlWorkflowParser.parseWorkflowInstance(response.getEntity().getContent());
403 }
404 }
405 } catch (NotFoundException e) {
406 throw e;
407 } catch (Exception e) {
408 throw new WorkflowDatabaseException(e);
409 } finally {
410 closeConnection(response);
411 }
412 throw new WorkflowDatabaseException("Unable to stop workflow instance " + workflowInstanceId);
413 }
414
415
416
417
418
419
420 @Override
421 public WorkflowInstance suspend(long workflowInstanceId) throws WorkflowDatabaseException, NotFoundException {
422 HttpPost post = new HttpPost("/suspend");
423 List<BasicNameValuePair> params = new ArrayList<>();
424 params.add(new BasicNameValuePair("id", Long.toString(workflowInstanceId)));
425 try {
426 post.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
427 } catch (UnsupportedEncodingException e) {
428 throw new IllegalStateException("Unable to assemble a remote workflow service request", e);
429 }
430 HttpResponse response = getResponse(post, SC_OK, SC_NOT_FOUND);
431 try {
432 if (response != null) {
433 if (response.getStatusLine().getStatusCode() == SC_NOT_FOUND) {
434 throw new NotFoundException("Workflow instance with id='" + workflowInstanceId + "' not found");
435 } else {
436 logger.info("Workflow '{}' suspended", workflowInstanceId);
437 return XmlWorkflowParser.parseWorkflowInstance(response.getEntity().getContent());
438 }
439 }
440 } catch (NotFoundException e) {
441 throw e;
442 } catch (Exception e) {
443 throw new WorkflowDatabaseException(e);
444 } finally {
445 closeConnection(response);
446 }
447 throw new WorkflowDatabaseException("Unable to suspend workflow instance " + workflowInstanceId);
448 }
449
450
451
452
453
454
455 @Override
456 public WorkflowInstance resume(long workflowInstanceId) throws NotFoundException, UnauthorizedException,
457 WorkflowException, IllegalStateException {
458 return resume(workflowInstanceId, null);
459 }
460
461
462
463
464
465
466 @Override
467 public WorkflowInstance resume(long workflowInstanceId, Map<String, String> properties) throws NotFoundException,
468 UnauthorizedException, WorkflowException, IllegalStateException {
469 HttpPost post = new HttpPost("/resume");
470 List<BasicNameValuePair> params = new ArrayList<>();
471 params.add(new BasicNameValuePair("id", Long.toString(workflowInstanceId)));
472 if (properties != null)
473 params.add(new BasicNameValuePair("properties", mapToString(properties)));
474 post.setEntity(new UrlEncodedFormEntity(params, StandardCharsets.UTF_8));
475 HttpResponse response = getResponse(post, SC_OK, SC_NOT_FOUND, SC_UNAUTHORIZED, SC_CONFLICT);
476 try {
477 if (response != null) {
478 if (response.getStatusLine().getStatusCode() == SC_NOT_FOUND) {
479 throw new NotFoundException("Workflow instance with id='" + workflowInstanceId + "' not found");
480 } else if (response.getStatusLine().getStatusCode() == SC_UNAUTHORIZED) {
481 throw new UnauthorizedException("You do not have permission to resume");
482 } else if (response.getStatusLine().getStatusCode() == SC_CONFLICT) {
483 throw new IllegalStateException("Can not resume a workflow where the current state is not in paused");
484 } else {
485 logger.info("Workflow '{}' resumed", workflowInstanceId);
486 return XmlWorkflowParser.parseWorkflowInstance(response.getEntity().getContent());
487 }
488 }
489 } catch (NotFoundException | UnauthorizedException | IllegalStateException e) {
490 throw e;
491 } catch (Exception e) {
492 throw new WorkflowException(e);
493 } finally {
494 closeConnection(response);
495 }
496 throw new WorkflowException("Unable to resume workflow instance " + workflowInstanceId);
497 }
498
499
500
501
502
503
504 @Override
505 public void update(WorkflowInstance workflowInstance) throws WorkflowDatabaseException {
506 HttpPost post = new HttpPost("/update");
507 try {
508 List<BasicNameValuePair> params = new ArrayList<>();
509 params.add(new BasicNameValuePair("workflow", XmlWorkflowParser.toXml(workflowInstance)));
510 post.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
511 } catch (UnsupportedEncodingException e) {
512 throw new IllegalStateException("Unable to assemble a remote workflow service request", e);
513 } catch (Exception e) {
514 throw new IllegalStateException("unable to serialize workflow instance to xml");
515 }
516
517 HttpResponse response = getResponse(post, SC_NO_CONTENT);
518 try {
519 if (response != null) {
520 logger.info("Workflow '{}' updated", workflowInstance);
521 return;
522 }
523 } finally {
524 closeConnection(response);
525 }
526 throw new WorkflowDatabaseException("Unable to update workflow instance " + workflowInstance.getId());
527 }
528
529
530
531
532
533
534 @Override
535 public void remove(long workflowInstanceId) throws WorkflowDatabaseException, NotFoundException {
536 remove(workflowInstanceId, false);
537 }
538
539
540
541
542
543
544 @Override
545 public void remove(long workflowInstanceId, boolean force) throws WorkflowDatabaseException, NotFoundException {
546 String deleteString = "/remove/" + Long.toString(workflowInstanceId);
547
548 if (force) {
549 List<NameValuePair> queryStringParams = new ArrayList<NameValuePair>();
550 queryStringParams.add(new BasicNameValuePair("force", "true"));
551 deleteString = deleteString + "?" + URLEncodedUtils.format(queryStringParams, "UTF_8");
552 }
553
554 HttpDelete delete = new HttpDelete(deleteString);
555
556 HttpResponse response = getResponse(delete, SC_NO_CONTENT, SC_NOT_FOUND);
557 try {
558 if (response != null) {
559 if (SC_NOT_FOUND == response.getStatusLine().getStatusCode()) {
560 throw new NotFoundException("Workflow id not found: " + workflowInstanceId);
561 } else {
562 logger.info("Workflow '{}' removed", workflowInstanceId);
563 return;
564 }
565 }
566 } finally {
567 closeConnection(response);
568 }
569 throw new WorkflowDatabaseException("Unable to remove workflow instance " + workflowInstanceId);
570 }
571
572
573
574
575
576
577 @Override
578 public List<WorkflowDefinition> listAvailableWorkflowDefinitions() throws WorkflowDatabaseException {
579 HttpGet get = new HttpGet("/definitions.xml");
580 HttpResponse response = getResponse(get);
581 try {
582 if (response != null) {
583 List<WorkflowDefinition> list = XmlWorkflowParser.parseWorkflowDefinitions(response.getEntity().getContent());
584 Collections.sort(list);
585 return list;
586 }
587 } catch (Exception e) {
588 throw new IllegalStateException("Unable to parse workflow definitions");
589 } finally {
590 closeConnection(response);
591 }
592 throw new WorkflowDatabaseException(
593 "Unable to read the registered workflow definitions from the remote workflow service");
594 }
595
596
597
598
599
600
601 @Override
602 public void addWorkflowListener(WorkflowListener listener) {
603 throw new UnsupportedOperationException("Adding workflow listeners to a remote workflow service is not supported");
604 }
605
606
607
608
609
610
611 @Override
612 public void removeWorkflowListener(WorkflowListener listener) {
613 throw new UnsupportedOperationException(
614 "Removing workflow listeners from a remote workflow service is not supported");
615 }
616
617 @Override
618 public void cleanupWorkflowInstances(int lifetime, WorkflowState state) throws WorkflowDatabaseException,
619 UnauthorizedException {
620 HttpPost post = new HttpPost("/cleanup");
621
622 List<BasicNameValuePair> params = new ArrayList<>();
623 params.add(new BasicNameValuePair("lifetime", String.valueOf(lifetime)));
624 if (state != null)
625 params.add(new BasicNameValuePair("state", state.toString()));
626 try {
627 post.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
628 } catch (UnsupportedEncodingException e) {
629 throw new IllegalStateException("Unable to assemble a remote workflow service request", e);
630 }
631
632 HttpResponse response = getResponse(post, SC_OK, HttpStatus.SC_UNAUTHORIZED);
633 try {
634 if (response != null) {
635 if (HttpStatus.SC_UNAUTHORIZED == response.getStatusLine().getStatusCode()) {
636 throw new UnauthorizedException("You do not have permission to cleanup");
637 } else {
638 logger.info("Successful request to workflow cleanup endpoint");
639 return;
640 }
641 }
642 } finally {
643 closeConnection(response);
644 }
645 throw new WorkflowDatabaseException("Unable to successfully request the workflow cleanup endpoint");
646 }
647
648 @Override
649 public Map<String, Map<String, String>> getWorkflowStateMappings() {
650 HttpGet get = new HttpGet("/statemappings.json");
651 HttpResponse response = getResponse(get);
652 try {
653 if (response != null) {
654 return (Map<String, Map<String, String>>) new JSONParser().parse(IOUtils.toString(response.getEntity().getContent(), "utf-8"));
655 }
656 } catch (Exception e) {
657 throw new IllegalStateException("Unable to parse workflow state mappings");
658 } finally {
659 closeConnection(response);
660 }
661 return new HashMap<>();
662 }
663 }