1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.external.endpoint;
22
23 import static com.entwinemedia.fn.data.json.Jsons.BLANK;
24 import static com.entwinemedia.fn.data.json.Jsons.ZERO;
25 import static com.entwinemedia.fn.data.json.Jsons.arr;
26 import static com.entwinemedia.fn.data.json.Jsons.f;
27 import static com.entwinemedia.fn.data.json.Jsons.obj;
28 import static com.entwinemedia.fn.data.json.Jsons.v;
29 import static java.time.ZoneOffset.UTC;
30 import static org.apache.commons.lang3.StringUtils.isBlank;
31 import static org.apache.commons.lang3.StringUtils.isNoneBlank;
32 import static org.opencastproject.util.RestUtil.getEndpointUrl;
33 import static org.opencastproject.util.doc.rest.RestParameter.Type.BOOLEAN;
34 import static org.opencastproject.util.doc.rest.RestParameter.Type.INTEGER;
35 import static org.opencastproject.util.doc.rest.RestParameter.Type.STRING;
36
37 import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
38 import org.opencastproject.elasticsearch.index.objects.event.Event;
39 import org.opencastproject.external.common.ApiMediaType;
40 import org.opencastproject.external.common.ApiResponseBuilder;
41 import org.opencastproject.external.common.ApiVersion;
42 import org.opencastproject.index.service.api.IndexService;
43 import org.opencastproject.mediapackage.MediaPackage;
44 import org.opencastproject.rest.RestConstants;
45 import org.opencastproject.security.api.UnauthorizedException;
46 import org.opencastproject.systems.OpencastConstants;
47 import org.opencastproject.util.NotFoundException;
48 import org.opencastproject.util.RestUtil;
49 import org.opencastproject.util.UrlSupport;
50 import org.opencastproject.util.data.Tuple;
51 import org.opencastproject.util.doc.rest.RestParameter;
52 import org.opencastproject.util.doc.rest.RestQuery;
53 import org.opencastproject.util.doc.rest.RestResponse;
54 import org.opencastproject.util.doc.rest.RestService;
55 import org.opencastproject.workflow.api.RetryStrategy;
56 import org.opencastproject.workflow.api.WorkflowDefinition;
57 import org.opencastproject.workflow.api.WorkflowInstance;
58 import org.opencastproject.workflow.api.WorkflowOperationInstance;
59 import org.opencastproject.workflow.api.WorkflowService;
60 import org.opencastproject.workflow.api.WorkflowStateException;
61
62 import com.entwinemedia.fn.data.Opt;
63 import com.entwinemedia.fn.data.json.Field;
64 import com.entwinemedia.fn.data.json.JValue;
65
66 import org.json.simple.JSONObject;
67 import org.json.simple.parser.JSONParser;
68 import org.json.simple.parser.ParseException;
69 import org.osgi.service.component.ComponentContext;
70 import org.osgi.service.component.annotations.Activate;
71 import org.osgi.service.component.annotations.Component;
72 import org.osgi.service.component.annotations.Reference;
73 import org.osgi.service.jaxrs.whiteboard.propertytypes.JaxrsResource;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 import java.net.URI;
78 import java.time.format.DateTimeFormatter;
79 import java.util.ArrayList;
80 import java.util.HashMap;
81 import java.util.List;
82 import java.util.Map;
83 import java.util.stream.Collectors;
84
85 import javax.servlet.http.HttpServletResponse;
86 import javax.ws.rs.DELETE;
87 import javax.ws.rs.FormParam;
88 import javax.ws.rs.GET;
89 import javax.ws.rs.HeaderParam;
90 import javax.ws.rs.POST;
91 import javax.ws.rs.PUT;
92 import javax.ws.rs.Path;
93 import javax.ws.rs.PathParam;
94 import javax.ws.rs.Produces;
95 import javax.ws.rs.QueryParam;
96 import javax.ws.rs.core.Response;
97
98 @Path("/api/workflows")
99 @Produces({ ApiMediaType.JSON, ApiMediaType.VERSION_1_1_0, ApiMediaType.VERSION_1_2_0, ApiMediaType.VERSION_1_3_0,
100 ApiMediaType.VERSION_1_4_0, ApiMediaType.VERSION_1_5_0, ApiMediaType.VERSION_1_6_0,
101 ApiMediaType.VERSION_1_7_0, ApiMediaType.VERSION_1_8_0, ApiMediaType.VERSION_1_9_0,
102 ApiMediaType.VERSION_1_10_0, ApiMediaType.VERSION_1_11_0 })
103 @RestService(name = "externalapiworkflowinstances", title = "External API Workflow Instances Service", notes = {},
104 abstractText = "Provides resources and operations related to the workflow instances")
105 @Component(
106 immediate = true,
107 service = WorkflowsEndpoint.class,
108 property = {
109 "service.description=External API - Workflow Instances Endpoint",
110 "opencast.service.type=org.opencastproject.external.workflows.instances",
111 "opencast.service.path=/api/workflows"
112 }
113 )
114 @JaxrsResource
115 public class WorkflowsEndpoint {
116
117 private static final Logger logger = LoggerFactory.getLogger(WorkflowsEndpoint.class);
118
119
120 protected String endpointBaseUrl;
121
122
123 private WorkflowService workflowService;
124 private ElasticsearchIndex elasticsearchIndex;
125 private IndexService indexService;
126
127
128 @Reference
129 public void setWorkflowService(WorkflowService workflowService) {
130 this.workflowService = workflowService;
131 }
132
133
134 @Reference
135 public void setElasticsearchIndex(ElasticsearchIndex elasticsearchIndex) {
136 this.elasticsearchIndex = elasticsearchIndex;
137 }
138
139
140 @Reference
141 public void setIndexService(IndexService indexService) {
142 this.indexService = indexService;
143 }
144
145
146
147
148 @Activate
149 void activate(ComponentContext cc) {
150 logger.info("Activating External API - Workflow Instances Endpoint");
151
152 final Tuple<String, String> endpointUrl = getEndpointUrl(cc, OpencastConstants.EXTERNAL_API_URL_ORG_PROPERTY,
153 RestConstants.SERVICE_PATH_PROPERTY);
154 endpointBaseUrl = UrlSupport.concat(endpointUrl.getA(), endpointUrl.getB());
155 logger.debug("Configured service endpoint is {}", endpointBaseUrl);
156 }
157
158 @POST
159 @Path("")
160 @RestQuery(name = "createworkflowinstance", description = "Creates a workflow instance.", returnDescription = "", restParameters = {
161 @RestParameter(name = "event_identifier", description = "The event identifier this workflow should run against", isRequired = true, type = STRING),
162 @RestParameter(name = "workflow_definition_identifier", description = "The identifier of the workflow definition to use", isRequired = true, type = STRING),
163 @RestParameter(name = "configuration", description = "The optional configuration for this workflow", isRequired = false, type = STRING),
164 @RestParameter(name = "withoperations", description = "Whether the workflow operations should be included in the response", isRequired = false, type = BOOLEAN),
165 @RestParameter(name = "withconfiguration", description = "Whether the workflow configuration should be included in the response", isRequired = false, type = BOOLEAN), }, responses = {
166 @RestResponse(description = "A new workflow is created and its identifier is returned in the Location header.", responseCode = HttpServletResponse.SC_CREATED),
167 @RestResponse(description = "The request is invalid or inconsistent.", responseCode = HttpServletResponse.SC_BAD_REQUEST),
168 @RestResponse(description = "The event or workflow definition could not be found.", responseCode = HttpServletResponse.SC_NOT_FOUND) })
169 public Response createWorkflowInstance(@HeaderParam("Accept") String acceptHeader,
170 @FormParam("event_identifier") String eventId,
171 @FormParam("workflow_definition_identifier") String workflowDefinitionIdentifier,
172 @FormParam("configuration") String configuration, @QueryParam("withoperations") boolean withOperations,
173 @QueryParam("withconfiguration") boolean withConfiguration) {
174 if (isBlank(eventId)) {
175 return RestUtil.R.badRequest("Required parameter 'event_identifier' is missing or invalid");
176 }
177
178 if (isBlank(workflowDefinitionIdentifier)) {
179 return RestUtil.R.badRequest("Required parameter 'workflow_definition_identifier' is missing or invalid");
180 }
181
182 try {
183
184 Opt<Event> event = indexService.getEvent(eventId, elasticsearchIndex);
185 if (event.isNone()) {
186 return ApiResponseBuilder.notFound("Cannot find an event with id '%s'.", eventId);
187 }
188 MediaPackage mp = indexService.getEventMediapackage(event.get());
189
190
191 WorkflowDefinition wd;
192 try {
193 wd = workflowService.getWorkflowDefinitionById(workflowDefinitionIdentifier);
194 } catch (NotFoundException e) {
195 return ApiResponseBuilder.notFound("Cannot find a workflow definition with id '%s'.", workflowDefinitionIdentifier);
196 }
197
198
199 Map<String, String> properties = new HashMap<>();
200 if (isNoneBlank(configuration)) {
201 JSONParser parser = new JSONParser();
202 try {
203 properties.putAll((JSONObject) parser.parse(configuration));
204 } catch (ParseException e) {
205 return RestUtil.R.badRequest("Passed parameter 'configuration' is invalid JSON.");
206 }
207 }
208
209
210 WorkflowInstance wi = workflowService.start(wd, mp, null, properties);
211 return ApiResponseBuilder.Json.created(acceptHeader, URI.create(getWorkflowUrl(wi.getId())),
212 workflowInstanceToJSON(wi, withOperations, withConfiguration));
213 } catch (IllegalStateException e) {
214 final ApiVersion requestedVersion = ApiMediaType.parse(acceptHeader).getVersion();
215 return ApiResponseBuilder.Json.conflict(requestedVersion, obj(f("message", v(e.getMessage(), BLANK))));
216 } catch (Exception e) {
217 logger.error("Could not create workflow instances", e);
218 return ApiResponseBuilder.serverError("Could not create workflow instances, reason: '%s'", e.getMessage());
219 }
220 }
221
222 @GET
223 @Path("{workflowInstanceId}")
224 @RestQuery(name = "getworkflowinstance", description = "Returns a single workflow instance.", returnDescription = "", pathParameters = {
225 @RestParameter(name = "workflowInstanceId", description = "The workflow instance id", isRequired = true, type = INTEGER) }, restParameters = {
226 @RestParameter(name = "withoperations", description = "Whether the workflow operations should be included in the response", isRequired = false, type = BOOLEAN),
227 @RestParameter(name = "withconfiguration", description = "Whether the workflow configuration should be included in the response", isRequired = false, type = BOOLEAN) }, responses = {
228 @RestResponse(description = "The workflow instance is returned.", responseCode = HttpServletResponse.SC_OK),
229 @RestResponse(description = "The user doesn't have the rights to make this request.", responseCode = HttpServletResponse.SC_FORBIDDEN),
230 @RestResponse(description = "The specified workflow instance does not exist.", responseCode = HttpServletResponse.SC_NOT_FOUND) })
231 public Response getWorkflowInstance(@HeaderParam("Accept") String acceptHeader,
232 @PathParam("workflowInstanceId") Long id, @QueryParam("withoperations") boolean withOperations,
233 @QueryParam("withconfiguration") boolean withConfiguration) {
234 WorkflowInstance wi;
235 try {
236 wi = workflowService.getWorkflowById(id);
237 } catch (NotFoundException e) {
238 return ApiResponseBuilder.notFound("Cannot find workflow instance with id '%d'.", id);
239 } catch (UnauthorizedException e) {
240 return Response.status(Response.Status.FORBIDDEN).build();
241 } catch (Exception e) {
242 logger.error("The workflow service was not able to get the workflow instance", e);
243 return ApiResponseBuilder.serverError("Could not retrieve workflow instance, reason: '%s'", e.getMessage());
244 }
245
246 return ApiResponseBuilder.Json.ok(acceptHeader, workflowInstanceToJSON(wi, withOperations, withConfiguration));
247 }
248
249 @PUT
250 @Path("{workflowInstanceId}")
251 @RestQuery(name = "updateworkflowinstance", description = "Creates a workflow instance.", returnDescription = "", pathParameters = {
252 @RestParameter(name = "workflowInstanceId", description = "The workflow instance id", isRequired = true, type = INTEGER) }, restParameters = {
253 @RestParameter(name = "configuration", description = "The optional configuration for this workflow", isRequired = false, type = STRING),
254 @RestParameter(name = "state", description = "The optional state transition for this workflow", isRequired = false, type = STRING),
255 @RestParameter(name = "withoperations", description = "Whether the workflow operations should be included in the response", isRequired = false, type = BOOLEAN),
256 @RestParameter(name = "withconfiguration", description = "Whether the workflow configuration should be included in the response", isRequired = false, type = BOOLEAN), }, responses = {
257 @RestResponse(description = "The workflow instance is updated.", responseCode = HttpServletResponse.SC_OK),
258 @RestResponse(description = "The request is invalid or inconsistent.", responseCode = HttpServletResponse.SC_BAD_REQUEST),
259 @RestResponse(description = "The user doesn't have the rights to make this request.", responseCode = HttpServletResponse.SC_FORBIDDEN),
260 @RestResponse(description = "The workflow instance could not be found.", responseCode = HttpServletResponse.SC_NOT_FOUND),
261 @RestResponse(description = "The workflow instance cannot transition to this state.", responseCode = HttpServletResponse.SC_CONFLICT) })
262 public Response updateWorkflowInstance(@HeaderParam("Accept") String acceptHeader,
263 @PathParam("workflowInstanceId") Long id, @FormParam("configuration") String configuration,
264 @FormParam("state") String stateStr, @QueryParam("withoperations") boolean withOperations,
265 @QueryParam("withconfiguration") boolean withConfiguration) {
266 try {
267 boolean changed = false;
268 WorkflowInstance wi = workflowService.getWorkflowById(id);
269
270
271 if (isNoneBlank(configuration)) {
272 JSONParser parser = new JSONParser();
273 try {
274 Map<String, String> properties = new HashMap<>((JSONObject) parser.parse(configuration));
275
276
277 wi.getConfigurationKeys().forEach(wi::removeConfiguration);
278
279 properties.forEach(wi::setConfiguration);
280
281 changed = true;
282 } catch (ParseException e) {
283 return RestUtil.R.badRequest("Passed parameter 'configuration' is invalid JSON.");
284 }
285 }
286
287
288
289 if (changed) {
290 workflowService.update(wi);
291 }
292
293
294 if (isNoneBlank(stateStr)) {
295 WorkflowInstance.WorkflowState state;
296 try {
297 state = jsonToEnum(WorkflowInstance.WorkflowState.class, stateStr);
298 } catch (IllegalArgumentException e) {
299 return RestUtil.R.badRequest(String.format("Invalid workflow state '%s'", stateStr));
300 }
301
302 WorkflowInstance.WorkflowState currentState = wi.getState();
303 if (state != currentState) {
304
305
306
307
308
309
310
311
312
313 switch (state) {
314 case PAUSED:
315 workflowService.suspend(wi.getId());
316 break;
317 case STOPPED:
318 workflowService.stop(wi.getId());
319 break;
320 case RUNNING:
321 if (currentState == WorkflowInstance.WorkflowState.INSTANTIATED
322 || currentState == WorkflowInstance.WorkflowState.PAUSED) {
323 workflowService.resume(wi.getId());
324 } else {
325 return RestUtil.R.conflict(
326 String.format("Cannot resume from workflow state '%s'", currentState.toString().toLowerCase()));
327 }
328 break;
329 default:
330 return RestUtil.R.conflict(
331 String.format("Cannot transition state from '%s' to '%s'", currentState.toString().toLowerCase(),
332 stateStr));
333 }
334 }
335 }
336
337 wi = workflowService.getWorkflowById(id);
338 return ApiResponseBuilder.Json.ok(acceptHeader, workflowInstanceToJSON(wi, withOperations, withConfiguration));
339 } catch (NotFoundException e) {
340 return ApiResponseBuilder.notFound("Cannot find workflow instance with id '%d'.", id);
341 } catch (UnauthorizedException e) {
342 return Response.status(Response.Status.FORBIDDEN).build();
343 } catch (Exception e) {
344 logger.error("The workflow service was not able to get the workflow instance", e);
345 return ApiResponseBuilder.serverError("Could not retrieve workflow instance, reason: '%s'", e.getMessage());
346 }
347 }
348
349 @DELETE
350 @Path("{workflowInstanceId}")
351 @RestQuery(name = "deleteworkflowinstance", description = "Deletes a workflow instance.", returnDescription = "", pathParameters = {
352 @RestParameter(name = "workflowInstanceId", description = "The workflow instance id", isRequired = true, type = INTEGER) }, responses = {
353 @RestResponse(description = "The workflow instance has been deleted.", responseCode = HttpServletResponse.SC_NO_CONTENT),
354 @RestResponse(description = "The user doesn't have the rights to make this request.", responseCode = HttpServletResponse.SC_FORBIDDEN),
355 @RestResponse(description = "The specified workflow instance does not exist.", responseCode = HttpServletResponse.SC_NOT_FOUND),
356 @RestResponse(description = "The workflow instance cannot be deleted in this state.", responseCode = HttpServletResponse.SC_CONFLICT) })
357 public Response deleteWorkflowInstance(@HeaderParam("Accept") String acceptHeader,
358 @PathParam("workflowInstanceId") Long id) {
359 try {
360 workflowService.remove(id);
361 } catch (WorkflowStateException e) {
362 return RestUtil.R.conflict("Cannot delete workflow instance in this workflow state");
363 } catch (NotFoundException e) {
364 return ApiResponseBuilder.notFound("Cannot find workflow instance with id '%d'.", id);
365 } catch (UnauthorizedException e) {
366 return Response.status(Response.Status.FORBIDDEN).build();
367 } catch (Exception e) {
368 logger.error("Could not delete workflow instances", e);
369 return ApiResponseBuilder.serverError("Could not delete workflow instances, reason: '%s'", e.getMessage());
370 }
371
372 return Response.noContent().build();
373 }
374
375 private JValue workflowInstanceToJSON(WorkflowInstance wi, boolean withOperations, boolean withConfiguration) {
376 List<Field> fields = new ArrayList<>();
377
378 fields.add(f("identifier", v(wi.getId())));
379 fields.add(f("title", v(wi.getTitle(), BLANK)));
380 fields.add(f("description", v(wi.getDescription(), BLANK)));
381 fields.add(f("workflow_definition_identifier", v(wi.getTemplate(), BLANK)));
382 fields.add(f("event_identifier", v(wi.getMediaPackage().getIdentifier().toString())));
383 fields.add(f("creator", v(wi.getCreatorName())));
384 fields.add(f("state", enumToJSON(wi.getState())));
385 if (withOperations) {
386 fields.add(f("operations", arr(wi.getOperations()
387 .stream()
388 .map(this::workflowOperationInstanceToJSON)
389 .collect(Collectors.toList()))));
390 }
391 if (withConfiguration) {
392 fields.add(f("configuration", obj(wi.getConfigurationKeys()
393 .stream()
394 .map(key -> f(key, wi.getConfiguration(key)))
395 .collect(Collectors.toList()))));
396 }
397
398 return obj(fields);
399 }
400
401 private JValue workflowOperationInstanceToJSON(WorkflowOperationInstance woi) {
402 List<Field> fields = new ArrayList<>();
403 DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE_TIME;
404
405
406 fields.add(f("identifier", v(woi.getId(), BLANK)));
407 fields.add(f("operation", v(woi.getTemplate())));
408 fields.add(f("description", v(woi.getDescription(), BLANK)));
409 fields.add(f("state", enumToJSON(woi.getState())));
410 fields.add(f("time_in_queue", v(woi.getTimeInQueue(), ZERO)));
411 fields.add(f("host", v(woi.getExecutionHost(), BLANK)));
412 fields.add(f("if", v(woi.getExecutionCondition(), BLANK)));
413 fields.add(f("fail_workflow_on_error", v(woi.isFailOnError())));
414 fields.add(f("error_handler_workflow", v(woi.getExceptionHandlingWorkflow(), BLANK)));
415 fields.add(f("retry_strategy", v(new RetryStrategy.Adapter().marshal(woi.getRetryStrategy()), BLANK)));
416 fields.add(f("max_attempts", v(woi.getMaxAttempts())));
417 fields.add(f("failed_attempts", v(woi.getFailedAttempts())));
418 fields.add(f("configuration", obj(woi.getConfigurationKeys()
419 .stream()
420 .map(key -> f(key, woi.getConfiguration(key)))
421 .collect(Collectors.toList()))));
422 if (woi.getDateStarted() != null) {
423 fields.add(f("start", v(dateFormatter.format(woi.getDateStarted().toInstant().atZone(UTC)))));
424 } else {
425 fields.add(f("start", BLANK));
426 }
427 if (woi.getDateCompleted() != null) {
428 fields.add(f("completion", v(dateFormatter.format(woi.getDateCompleted().toInstant().atZone(UTC)))));
429 } else {
430 fields.add(f("completion", BLANK));
431 }
432
433 return obj(fields);
434 }
435
436 private JValue enumToJSON(Enum e) {
437 return e == null ? null : v(e.toString().toLowerCase());
438 }
439
440 private <T extends Enum<T>> T jsonToEnum(Class<T> enumType, String name) {
441 return Enum.valueOf(enumType, name.toUpperCase());
442 }
443
444 private String getWorkflowUrl(long workflowInstanceId) {
445 return UrlSupport.concat(endpointBaseUrl, Long.toString(workflowInstanceId));
446 }
447 }