1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.workflow.remote;
23
24 import static org.apache.http.HttpStatus.SC_CONFLICT;
25 import static org.apache.http.HttpStatus.SC_NOT_FOUND;
26 import static org.apache.http.HttpStatus.SC_NO_CONTENT;
27 import static org.apache.http.HttpStatus.SC_OK;
28 import static org.apache.http.HttpStatus.SC_UNAUTHORIZED;
29
30 import org.opencastproject.mediapackage.MediaPackage;
31 import org.opencastproject.mediapackage.MediaPackageParser;
32 import org.opencastproject.security.api.TrustedHttpClient;
33 import org.opencastproject.security.api.UnauthorizedException;
34 import org.opencastproject.serviceregistry.api.RemoteBase;
35 import org.opencastproject.serviceregistry.api.ServiceRegistry;
36 import org.opencastproject.util.NotFoundException;
37 import org.opencastproject.workflow.api.WorkflowDatabaseException;
38 import org.opencastproject.workflow.api.WorkflowDefinition;
39 import org.opencastproject.workflow.api.WorkflowException;
40 import org.opencastproject.workflow.api.WorkflowInstance;
41 import org.opencastproject.workflow.api.WorkflowInstance.WorkflowState;
42 import org.opencastproject.workflow.api.WorkflowListener;
43 import org.opencastproject.workflow.api.WorkflowService;
44 import org.opencastproject.workflow.api.XmlWorkflowParser;
45
46 import org.apache.commons.io.IOUtils;
47 import org.apache.http.HttpResponse;
48 import org.apache.http.HttpStatus;
49 import org.apache.http.NameValuePair;
50 import org.apache.http.ParseException;
51 import org.apache.http.client.entity.UrlEncodedFormEntity;
52 import org.apache.http.client.methods.HttpDelete;
53 import org.apache.http.client.methods.HttpGet;
54 import org.apache.http.client.methods.HttpPost;
55 import org.apache.http.client.utils.URLEncodedUtils;
56 import org.apache.http.message.BasicNameValuePair;
57 import org.apache.http.util.EntityUtils;
58 import org.json.simple.parser.JSONParser;
59 import org.osgi.service.component.annotations.Component;
60 import org.osgi.service.component.annotations.Reference;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 import java.io.IOException;
65 import java.io.UnsupportedEncodingException;
66 import java.nio.charset.StandardCharsets;
67 import java.util.ArrayList;
68 import java.util.Collections;
69 import java.util.HashMap;
70 import java.util.List;
71 import java.util.Map;
72 import java.util.Map.Entry;
73 import java.util.Optional;
74
75
76
77
78 @Component(
79 property = {
80 "service.description=Workflow Remote Service Proxy"
81 },
82 immediate = true,
83 service = { WorkflowService.class }
84 )
85 public class WorkflowServiceRemoteImpl extends RemoteBase implements WorkflowService {
86
87
88 private static final Logger logger = LoggerFactory.getLogger(WorkflowServiceRemoteImpl.class);
89
90 public WorkflowServiceRemoteImpl() {
91 super(JOB_TYPE);
92 }
93
94
95
96
97
98
99 @Override
100 @Reference
101 public void setTrustedHttpClient(TrustedHttpClient client) {
102 super.setTrustedHttpClient(client);
103 }
104
105
106
107
108
109
110 @Override
111 @Reference
112 public void setRemoteServiceManager(ServiceRegistry remoteServiceManager) {
113 this.remoteServiceManager = remoteServiceManager;
114 }
115
116
117
118
119
120
121 @Override
122 public WorkflowDefinition getWorkflowDefinitionById(String id) throws WorkflowDatabaseException, NotFoundException {
123 HttpGet get = new HttpGet("/definition/" + id + ".xml");
124 HttpResponse response = getResponse(get, SC_NOT_FOUND, SC_OK);
125 try {
126 if (response != null) {
127 if (SC_NOT_FOUND == response.getStatusLine().getStatusCode()) {
128 throw new NotFoundException("Workflow definition " + id + " does not exist.");
129 } else {
130 return XmlWorkflowParser.parseWorkflowDefinition(response.getEntity().getContent());
131 }
132 }
133 } catch (NotFoundException e) {
134 throw e;
135 } catch (Exception e) {
136 throw new WorkflowDatabaseException(e);
137 } finally {
138 closeConnection(response);
139 }
140 throw new WorkflowDatabaseException("Unable to connect to a remote workflow service");
141 }
142
143
144
145
146
147
148 @Override
149 public WorkflowInstance getWorkflowById(long id) throws WorkflowDatabaseException, NotFoundException {
150 HttpGet get = new HttpGet("/instance/" + id + ".xml");
151 HttpResponse response = getResponse(get, SC_NOT_FOUND, SC_OK);
152 try {
153 if (response != null) {
154 if (SC_NOT_FOUND == response.getStatusLine().getStatusCode()) {
155 throw new NotFoundException("Workflow instance " + id + " does not exist.");
156 } else {
157 return XmlWorkflowParser.parseWorkflowInstance(response.getEntity().getContent());
158 }
159 }
160 } catch (NotFoundException e) {
161 throw e;
162 } catch (Exception e) {
163 throw new WorkflowDatabaseException(e);
164 } finally {
165 closeConnection(response);
166 }
167 throw new WorkflowDatabaseException("Unable to connect to a remote workflow service");
168 }
169
170 @Override
171 public List<WorkflowInstance> getWorkflowInstancesByMediaPackage(String mediaPackageId)
172 throws WorkflowDatabaseException {
173 HttpGet get = new HttpGet("/mediaPackage/" + mediaPackageId + "/instances.xml");
174 HttpResponse response = getResponse(get);
175 try {
176 if (response != null) {
177 return XmlWorkflowParser.parseWorkflowSet(response.getEntity().getContent()).getItems();
178 }
179 } catch (Exception e) {
180 throw new WorkflowDatabaseException(e);
181 } finally {
182 closeConnection(response);
183 }
184 throw new WorkflowDatabaseException("Workflow instances can not be loaded from a remote workflow service");
185 }
186
187 @Override
188 public Optional<WorkflowInstance> getRunningWorkflowInstanceByMediaPackage(String mediaPackageId, String action)
189 throws WorkflowException {
190
191 HttpGet get = new HttpGet("/mediaPackage/" + mediaPackageId + "/instances.xml");
192 HttpResponse response = getResponse(get, SC_NOT_FOUND, SC_OK);
193 try {
194 if (response != null) {
195 if (SC_NOT_FOUND == response.getStatusLine().getStatusCode()) {
196 return Optional.empty();
197 }
198 return Optional.ofNullable(
199 XmlWorkflowParser.parseWorkflowInstance(response.getEntity().getContent())
200 );
201 }
202 } catch (Exception e) {
203 throw new WorkflowDatabaseException(e);
204 } finally {
205 closeConnection(response);
206 }
207 throw new WorkflowDatabaseException("Workflow instances can not be loaded from a remote workflow service");
208 }
209
210 @Override
211 public boolean mediaPackageHasActiveWorkflows(String mediaPackageId) throws WorkflowDatabaseException {
212 HttpGet get = new HttpGet("/mediaPackage/" + mediaPackageId + "/hasActiveWorkflows");
213 HttpResponse response = getResponse(get);
214 try {
215 if (response != null) {
216 return Boolean.parseBoolean(response.getEntity().getContent().toString());
217 }
218 } catch (Exception e) {
219 throw new WorkflowDatabaseException(e);
220 } finally {
221 closeConnection(response);
222 }
223 throw new WorkflowDatabaseException("Workflow instances can not be loaded from a remote workflow service");
224 }
225
226 @Override
227 public boolean userHasActiveWorkflows(String userId) throws WorkflowDatabaseException {
228 HttpGet get = new HttpGet("/user/" + userId + "/hasActiveWorkflows");
229 HttpResponse response = getResponse(get);
230 try {
231 if (response != null) {
232 return Boolean.parseBoolean(response.getEntity().getContent().toString());
233 }
234 } catch (Exception e) {
235 throw new WorkflowDatabaseException(e);
236 } finally {
237 closeConnection(response);
238 }
239 throw new WorkflowDatabaseException("Workflow instances can not be loaded from a remote workflow service");
240 }
241
242
243
244
245
246
247
248 @Override
249 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage,
250 Map<String, String> properties) throws WorkflowDatabaseException {
251 try {
252 return start(workflowDefinition, mediaPackage, null, properties);
253 } catch (NotFoundException e) {
254 throw new IllegalStateException("A null parent workflow id should never result in a not found exception ", e);
255 }
256 }
257
258
259
260
261
262
263
264
265
266 private String mapToString(Map<String, String> props) {
267 StringBuilder sb = new StringBuilder();
268 for (Entry<String, String> entry : props.entrySet()) {
269 sb.append(entry.getKey());
270 sb.append("=");
271 sb.append(entry.getValue());
272 sb.append("\n");
273 }
274 return sb.toString();
275 }
276
277
278
279
280
281
282
283 @Override
284 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage,
285 Long parentWorkflowId, Map<String, String> properties) throws WorkflowDatabaseException, NotFoundException {
286 HttpPost post = new HttpPost("/start");
287 try {
288 List<BasicNameValuePair> params = new ArrayList<>();
289 if (workflowDefinition != null) {
290 params.add(new BasicNameValuePair("definition", XmlWorkflowParser.toXml(workflowDefinition)));
291 }
292 params.add(new BasicNameValuePair("mediapackage", MediaPackageParser.getAsXml(mediaPackage)));
293 if (parentWorkflowId != null) {
294 params.add(new BasicNameValuePair("parent", parentWorkflowId.toString()));
295 }
296 if (properties != null) {
297 params.add(new BasicNameValuePair("properties", mapToString(properties)));
298 }
299 post.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
300 } catch (Exception e) {
301 throw new IllegalStateException("Unable to assemble a remote workflow request", e);
302 }
303 HttpResponse response = getResponse(post, SC_NOT_FOUND, SC_OK);
304 try {
305 if (response != null) {
306 if (SC_NOT_FOUND == response.getStatusLine().getStatusCode()) {
307 throw new NotFoundException("Workflow instance " + parentWorkflowId + " does not exist.");
308 } else {
309 return XmlWorkflowParser.parseWorkflowInstance(response.getEntity().getContent());
310 }
311 }
312 } catch (NotFoundException e) {
313 throw e;
314 } catch (Exception e) {
315 throw new WorkflowDatabaseException("Unable to build a workflow from xml", e);
316 } finally {
317 closeConnection(response);
318 }
319 throw new WorkflowDatabaseException("Unable to start a remote workflow. The http response code was unexpected.");
320 }
321
322
323
324
325
326
327
328 @Override
329 public WorkflowInstance start(WorkflowDefinition workflowDefinition, MediaPackage mediaPackage)
330 throws WorkflowDatabaseException {
331 try {
332 return start(workflowDefinition, mediaPackage, null, null);
333 } catch (NotFoundException e) {
334 throw new IllegalStateException("A null parent workflow id should never result in a not found exception ", e);
335 }
336 }
337
338
339
340
341
342
343 @Override
344 public long countWorkflowInstances() throws WorkflowDatabaseException {
345 return countWorkflowInstances(null);
346 }
347
348
349
350
351
352
353
354 @Override
355 public long countWorkflowInstances(WorkflowState state) throws WorkflowDatabaseException {
356 List<NameValuePair> queryStringParams = new ArrayList<>();
357 if (state != null) {
358 queryStringParams.add(new BasicNameValuePair("state", state.toString()));
359 }
360
361 StringBuilder url = new StringBuilder("/count");
362 if (queryStringParams.size() > 0) {
363 url.append("?");
364 url.append(URLEncodedUtils.format(queryStringParams, "UTF-8"));
365 }
366
367 HttpGet get = new HttpGet(url.toString());
368 HttpResponse response = getResponse(get);
369 try {
370 if (response != null) {
371 String body = null;
372 try {
373 body = EntityUtils.toString(response.getEntity());
374 return Long.parseLong(body);
375 } catch (NumberFormatException e) {
376 throw new WorkflowDatabaseException("Unable to parse the response body as a long: " + body);
377 }
378 }
379 } catch (ParseException | IOException e) {
380 throw new WorkflowDatabaseException("Unable to parse the response body");
381 } finally {
382 closeConnection(response);
383 }
384
385 throw new WorkflowDatabaseException("Unable to count workflow instances");
386 }
387
388
389
390
391
392
393 @Override
394 public WorkflowInstance stop(long workflowInstanceId) throws WorkflowDatabaseException, NotFoundException {
395 HttpPost post = new HttpPost("/stop");
396 List<BasicNameValuePair> params = new ArrayList<>();
397 params.add(new BasicNameValuePair("id", Long.toString(workflowInstanceId)));
398 try {
399 post.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
400 } catch (UnsupportedEncodingException e) {
401 throw new IllegalStateException("Unable to assemble a remote workflow service request", e);
402 }
403 HttpResponse response = getResponse(post, SC_OK, SC_NOT_FOUND);
404 try {
405 if (response != null) {
406 if (response.getStatusLine().getStatusCode() == SC_NOT_FOUND) {
407 throw new NotFoundException("Workflow instance with id='" + workflowInstanceId + "' not found");
408 } else {
409 logger.info("Workflow '{}' stopped", workflowInstanceId);
410 return XmlWorkflowParser.parseWorkflowInstance(response.getEntity().getContent());
411 }
412 }
413 } catch (NotFoundException e) {
414 throw e;
415 } catch (Exception e) {
416 throw new WorkflowDatabaseException(e);
417 } finally {
418 closeConnection(response);
419 }
420 throw new WorkflowDatabaseException("Unable to stop workflow instance " + workflowInstanceId);
421 }
422
423
424
425
426
427
428 @Override
429 public WorkflowInstance suspend(long workflowInstanceId) throws WorkflowDatabaseException, NotFoundException {
430 HttpPost post = new HttpPost("/suspend");
431 List<BasicNameValuePair> params = new ArrayList<>();
432 params.add(new BasicNameValuePair("id", Long.toString(workflowInstanceId)));
433 try {
434 post.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
435 } catch (UnsupportedEncodingException e) {
436 throw new IllegalStateException("Unable to assemble a remote workflow service request", e);
437 }
438 HttpResponse response = getResponse(post, SC_OK, SC_NOT_FOUND);
439 try {
440 if (response != null) {
441 if (response.getStatusLine().getStatusCode() == SC_NOT_FOUND) {
442 throw new NotFoundException("Workflow instance with id='" + workflowInstanceId + "' not found");
443 } else {
444 logger.info("Workflow '{}' suspended", workflowInstanceId);
445 return XmlWorkflowParser.parseWorkflowInstance(response.getEntity().getContent());
446 }
447 }
448 } catch (NotFoundException e) {
449 throw e;
450 } catch (Exception e) {
451 throw new WorkflowDatabaseException(e);
452 } finally {
453 closeConnection(response);
454 }
455 throw new WorkflowDatabaseException("Unable to suspend workflow instance " + workflowInstanceId);
456 }
457
458
459
460
461
462
463 @Override
464 public WorkflowInstance resume(long workflowInstanceId) throws NotFoundException, UnauthorizedException,
465 WorkflowException, IllegalStateException {
466 return resume(workflowInstanceId, null);
467 }
468
469
470
471
472
473
474 @Override
475 public WorkflowInstance resume(long workflowInstanceId, Map<String, String> properties) throws NotFoundException,
476 UnauthorizedException, WorkflowException, IllegalStateException {
477 HttpPost post = new HttpPost("/resume");
478 List<BasicNameValuePair> params = new ArrayList<>();
479 params.add(new BasicNameValuePair("id", Long.toString(workflowInstanceId)));
480 if (properties != null) {
481 params.add(new BasicNameValuePair("properties", mapToString(properties)));
482 }
483 post.setEntity(new UrlEncodedFormEntity(params, StandardCharsets.UTF_8));
484 HttpResponse response = getResponse(post, SC_OK, SC_NOT_FOUND, SC_UNAUTHORIZED, SC_CONFLICT);
485 try {
486 if (response != null) {
487 if (response.getStatusLine().getStatusCode() == SC_NOT_FOUND) {
488 throw new NotFoundException("Workflow instance with id='" + workflowInstanceId + "' not found");
489 } else if (response.getStatusLine().getStatusCode() == SC_UNAUTHORIZED) {
490 throw new UnauthorizedException("You do not have permission to resume");
491 } else if (response.getStatusLine().getStatusCode() == SC_CONFLICT) {
492 throw new IllegalStateException("Can not resume a workflow where the current state is not in paused");
493 } else {
494 logger.info("Workflow '{}' resumed", workflowInstanceId);
495 return XmlWorkflowParser.parseWorkflowInstance(response.getEntity().getContent());
496 }
497 }
498 } catch (NotFoundException | UnauthorizedException | IllegalStateException e) {
499 throw e;
500 } catch (Exception e) {
501 throw new WorkflowException(e);
502 } finally {
503 closeConnection(response);
504 }
505 throw new WorkflowException("Unable to resume workflow instance " + workflowInstanceId);
506 }
507
508
509
510
511
512
513 @Override
514 public void update(WorkflowInstance workflowInstance) throws WorkflowDatabaseException {
515 HttpPost post = new HttpPost("/update");
516 try {
517 List<BasicNameValuePair> params = new ArrayList<>();
518 params.add(new BasicNameValuePair("workflow", XmlWorkflowParser.toXml(workflowInstance)));
519 post.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
520 } catch (UnsupportedEncodingException e) {
521 throw new IllegalStateException("Unable to assemble a remote workflow service request", e);
522 } catch (Exception e) {
523 throw new IllegalStateException("unable to serialize workflow instance to xml");
524 }
525
526 HttpResponse response = getResponse(post, SC_NO_CONTENT);
527 try {
528 if (response != null) {
529 logger.info("Workflow '{}' updated", workflowInstance);
530 return;
531 }
532 } finally {
533 closeConnection(response);
534 }
535 throw new WorkflowDatabaseException("Unable to update workflow instance " + workflowInstance.getId());
536 }
537
538
539
540
541
542
543 @Override
544 public void remove(long workflowInstanceId) throws WorkflowDatabaseException, NotFoundException {
545 remove(workflowInstanceId, false);
546 }
547
548
549
550
551
552
553 @Override
554 public void remove(long workflowInstanceId, boolean force) throws WorkflowDatabaseException, NotFoundException {
555 String deleteString = "/remove/" + Long.toString(workflowInstanceId);
556
557 if (force) {
558 List<NameValuePair> queryStringParams = new ArrayList<NameValuePair>();
559 queryStringParams.add(new BasicNameValuePair("force", "true"));
560 deleteString = deleteString + "?" + URLEncodedUtils.format(queryStringParams, "UTF_8");
561 }
562
563 HttpDelete delete = new HttpDelete(deleteString);
564
565 HttpResponse response = getResponse(delete, SC_NO_CONTENT, SC_NOT_FOUND);
566 try {
567 if (response != null) {
568 if (SC_NOT_FOUND == response.getStatusLine().getStatusCode()) {
569 throw new NotFoundException("Workflow id not found: " + workflowInstanceId);
570 } else {
571 logger.info("Workflow '{}' removed", workflowInstanceId);
572 return;
573 }
574 }
575 } finally {
576 closeConnection(response);
577 }
578 throw new WorkflowDatabaseException("Unable to remove workflow instance " + workflowInstanceId);
579 }
580
581
582
583
584
585
586 @Override
587 public List<WorkflowDefinition> listAvailableWorkflowDefinitions() throws WorkflowDatabaseException {
588 HttpGet get = new HttpGet("/definitions.xml");
589 HttpResponse response = getResponse(get);
590 try {
591 if (response != null) {
592 List<WorkflowDefinition> list = XmlWorkflowParser.parseWorkflowDefinitions(response.getEntity().getContent());
593 Collections.sort(list);
594 return list;
595 }
596 } catch (Exception e) {
597 throw new IllegalStateException("Unable to parse workflow definitions");
598 } finally {
599 closeConnection(response);
600 }
601 throw new WorkflowDatabaseException(
602 "Unable to read the registered workflow definitions from the remote workflow service");
603 }
604
605
606
607
608
609
610
611 @Override
612 public void addWorkflowListener(WorkflowListener listener) {
613 throw new UnsupportedOperationException("Adding workflow listeners to a remote workflow service is not supported");
614 }
615
616
617
618
619
620
621
622 @Override
623 public void removeWorkflowListener(WorkflowListener listener) {
624 throw new UnsupportedOperationException(
625 "Removing workflow listeners from a remote workflow service is not supported");
626 }
627
628 @Override
629 public void cleanupWorkflowInstances(int lifetime, WorkflowState state) throws WorkflowDatabaseException,
630 UnauthorizedException {
631 HttpPost post = new HttpPost("/cleanup");
632
633 List<BasicNameValuePair> params = new ArrayList<>();
634 params.add(new BasicNameValuePair("lifetime", String.valueOf(lifetime)));
635 if (state != null) {
636 params.add(new BasicNameValuePair("state", state.toString()));
637 }
638 try {
639 post.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
640 } catch (UnsupportedEncodingException e) {
641 throw new IllegalStateException("Unable to assemble a remote workflow service request", e);
642 }
643
644 HttpResponse response = getResponse(post, SC_OK, HttpStatus.SC_UNAUTHORIZED);
645 try {
646 if (response != null) {
647 if (HttpStatus.SC_UNAUTHORIZED == response.getStatusLine().getStatusCode()) {
648 throw new UnauthorizedException("You do not have permission to cleanup");
649 } else {
650 logger.info("Successful request to workflow cleanup endpoint");
651 return;
652 }
653 }
654 } finally {
655 closeConnection(response);
656 }
657 throw new WorkflowDatabaseException("Unable to successfully request the workflow cleanup endpoint");
658 }
659
660 @Override
661 public Map<String, Map<String, String>> getWorkflowStateMappings() {
662 HttpGet get = new HttpGet("/statemappings.json");
663 HttpResponse response = getResponse(get);
664 try {
665 if (response != null) {
666 return (Map<String, Map<String, String>>) new JSONParser().parse(
667 IOUtils.toString(response.getEntity().getContent(), "utf-8"));
668 }
669 } catch (Exception e) {
670 throw new IllegalStateException("Unable to parse workflow state mappings");
671 } finally {
672 closeConnection(response);
673 }
674 return new HashMap<>();
675 }
676 }