1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.opencastproject.workflow.handler.notification;
24
25 import static java.lang.String.format;
26 import static org.apache.http.HttpStatus.SC_ACCEPTED;
27 import static org.apache.http.HttpStatus.SC_NO_CONTENT;
28 import static org.apache.http.HttpStatus.SC_OK;
29
30 import org.opencastproject.job.api.JobContext;
31 import org.opencastproject.serviceregistry.api.ServiceRegistry;
32 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
33 import org.opencastproject.workflow.api.WorkflowInstance;
34 import org.opencastproject.workflow.api.WorkflowOperationException;
35 import org.opencastproject.workflow.api.WorkflowOperationHandler;
36 import org.opencastproject.workflow.api.WorkflowOperationResult;
37 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
38
39 import org.apache.commons.lang3.StringUtils;
40 import org.apache.http.HttpResponse;
41 import org.apache.http.client.ClientProtocolException;
42 import org.apache.http.client.config.RequestConfig;
43 import org.apache.http.client.entity.UrlEncodedFormEntity;
44 import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
45 import org.apache.http.client.methods.HttpPost;
46 import org.apache.http.client.methods.HttpPut;
47 import org.apache.http.client.methods.HttpUriRequest;
48 import org.apache.http.impl.client.CloseableHttpClient;
49 import org.apache.http.impl.client.HttpClientBuilder;
50 import org.apache.http.message.BasicNameValuePair;
51 import org.osgi.service.component.annotations.Component;
52 import org.osgi.service.component.annotations.Reference;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import java.io.IOException;
57 import java.io.UnsupportedEncodingException;
58 import java.util.ArrayList;
59 import java.util.List;
60
61
62
63
64
65
66
67
68
69
70 @Component(
71 immediate = true,
72 service = WorkflowOperationHandler.class,
73 property = {
74 "service.description=Http Notification Operation Handler",
75 "workflow.operation=http-notify"
76 }
77 )
78 public class HttpNotificationWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
79
80
81 public static final String OPT_URL_PATH = "url";
82
83
84 public static final String OPT_NOTIFICATION_SUBJECT = "subject";
85
86
87 public static final String OPT_NOTIFICATION_MESSAGE = "message";
88
89
90 public static final String OPT_METHOD = "method";
91
92
93 public static final String OPT_MAX_RETRY = "max-retry";
94
95
96 public static final String OPT_TIMEOUT = "timeout";
97
98
99 public static final String HTTP_PARAM_SUBJECT = "subject";
100
101
102 public static final String HTTP_PARAM_MESSAGE = "message";
103
104
105 public static final String HTTP_PARAM_WORKFLOW = "workflowInstanceId";
106
107
108 private static final Logger logger = LoggerFactory.getLogger(HttpNotificationWorkflowOperationHandler.class);
109
110
111 private static final int DEFAULT_MAX_RETRY = 5;
112
113
114 private static final int DEFAULT_TIMEOUT = 10 * 1000;
115
116
117 public static final int INITIAL_SLEEP_TIME = 10 * 1000;
118
119
120 public static final int SLEEP_SCALE_FACTOR = 2;
121
122
123
124
125 @Override
126 public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
127 throws WorkflowOperationException {
128 logger.debug("Running HTTP notification workflow operation on workflow {}", workflowInstance.getId());
129
130 int maxRetry = DEFAULT_MAX_RETRY;
131 int timeout = DEFAULT_TIMEOUT;
132
133
134 String urlPath = getConfig(workflowInstance, OPT_URL_PATH);
135
136
137 String notificationSubject = getConfig(workflowInstance, OPT_NOTIFICATION_SUBJECT, null);
138 String notificationMessage = getConfig(workflowInstance, OPT_NOTIFICATION_MESSAGE, null);
139 String method = getConfig(workflowInstance, OPT_METHOD, "post");
140 String maxRetryOpt = getConfig(workflowInstance, OPT_MAX_RETRY, null);
141 String timeoutOpt = getConfig(workflowInstance, OPT_TIMEOUT, null);
142
143
144
145 if (timeoutOpt != null) {
146 timeout = Integer.parseInt(timeoutOpt) * 1000;
147 }
148
149
150 if (maxRetryOpt != null) {
151 maxRetry = Integer.parseInt(maxRetryOpt);
152 }
153
154
155 HttpEntityEnclosingRequestBase request = null;
156 if (StringUtils.equalsIgnoreCase("post", method)) {
157 request = new HttpPost(urlPath);
158 } else if (StringUtils.equalsIgnoreCase("put", method)) {
159 request = new HttpPut(urlPath);
160 } else {
161 throw new WorkflowOperationException("The configuration key '" + OPT_METHOD + "' only supports 'post' and 'put'");
162 }
163 logger.debug("Request will be sent using the '{}' method", method);
164
165
166 try {
167 List<BasicNameValuePair> params = new ArrayList<>();
168
169
170 if (notificationSubject != null) {
171 params.add(new BasicNameValuePair(HTTP_PARAM_SUBJECT, notificationSubject));
172 }
173
174
175 if (notificationMessage != null) {
176 params.add(new BasicNameValuePair(HTTP_PARAM_MESSAGE, notificationMessage));
177 }
178
179
180 params.add(new BasicNameValuePair(HTTP_PARAM_WORKFLOW, Long.toString(workflowInstance.getId())));
181
182 request.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
183 } catch (UnsupportedEncodingException e) {
184 throw new WorkflowOperationException(
185 "Error happened during the encoding of the event parameter as form parameter:", e);
186 }
187
188
189 if (!executeRequest(request, maxRetry, timeout, INITIAL_SLEEP_TIME)) {
190 throw new WorkflowOperationException(format("Notification could not be delivered to %s", urlPath));
191 }
192
193 return createResult(workflowInstance.getMediaPackage(), Action.CONTINUE);
194 }
195
196
197
198
199
200
201
202
203
204
205
206
207
208 private boolean executeRequest(HttpUriRequest request, int maxAttempts, int timeout, int sleepTime) {
209
210 logger.debug("Executing notification request on target {}, {} attempts left", request.getURI(), maxAttempts);
211
212 RequestConfig config = RequestConfig.custom()
213 .setConnectTimeout(timeout)
214 .setConnectionRequestTimeout(timeout)
215 .setSocketTimeout(timeout).build();
216 CloseableHttpClient httpClient = HttpClientBuilder.create().setDefaultRequestConfig(config).build();
217
218 HttpResponse response;
219 try {
220 response = httpClient.execute(request);
221 } catch (ClientProtocolException e) {
222 logger.error("Protocol error during execution of query on target {}: {}", request.getURI(), e.getMessage());
223 return false;
224 } catch (IOException e) {
225 logger.error("I/O error during execution of query on target {}: {}", request.getURI(), e.getMessage());
226 return false;
227 }
228
229 Integer statusCode = response.getStatusLine().getStatusCode();
230 if (statusCode == SC_OK || statusCode == SC_NO_CONTENT || statusCode == SC_ACCEPTED) {
231 logger.debug("Request successfully executed on target {}, status code: {}", request.getURI(), statusCode);
232 return true;
233 } else if (maxAttempts > 1) {
234 logger.debug("Request failed on target {}, status code: {}, will retry in {} seconds", request.getURI(),
235 statusCode, sleepTime / 1000);
236 try {
237 Thread.sleep(sleepTime);
238 return executeRequest(request, --maxAttempts, timeout, sleepTime * SLEEP_SCALE_FACTOR);
239 } catch (InterruptedException e) {
240 logger.error("Error during sleep time before new notification request try: {}", e.getMessage());
241 return false;
242 }
243 } else {
244 logger.warn("Request failed on target {}, status code: {}, no more attempt.", request.getURI(), statusCode);
245 return false;
246 }
247 }
248
249 @Reference
250 @Override
251 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
252 super.setServiceRegistry(serviceRegistry);
253 }
254
255 }