1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.workflow.handler.mattermost.notification;
22
23 import static java.lang.String.format;
24 import static org.apache.commons.lang3.StringUtils.join;
25 import static org.apache.http.HttpStatus.SC_ACCEPTED;
26 import static org.apache.http.HttpStatus.SC_NO_CONTENT;
27 import static org.apache.http.HttpStatus.SC_OK;
28
29 import org.opencastproject.job.api.JobContext;
30 import org.opencastproject.mediapackage.MediaPackage;
31 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
32 import org.opencastproject.workflow.api.WorkflowInstance;
33 import org.opencastproject.workflow.api.WorkflowOperationException;
34 import org.opencastproject.workflow.api.WorkflowOperationHandler;
35 import org.opencastproject.workflow.api.WorkflowOperationResult;
36 import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
37
38 import com.google.gson.Gson;
39 import com.google.gson.GsonBuilder;
40 import com.google.gson.JsonObject;
41
42 import org.apache.commons.lang3.StringUtils;
43 import org.apache.http.HttpResponse;
44 import org.apache.http.client.ClientProtocolException;
45 import org.apache.http.client.config.RequestConfig;
46 import org.apache.http.client.entity.UrlEncodedFormEntity;
47 import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
48 import org.apache.http.client.methods.HttpPost;
49 import org.apache.http.client.methods.HttpPut;
50 import org.apache.http.client.methods.HttpUriRequest;
51 import org.apache.http.impl.client.CloseableHttpClient;
52 import org.apache.http.impl.client.HttpClientBuilder;
53 import org.apache.http.message.BasicNameValuePair;
54 import org.osgi.service.component.annotations.Component;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 import java.io.IOException;
59 import java.io.UnsupportedEncodingException;
60 import java.util.ArrayList;
61 import java.util.List;
62
63
64
65
66 @Component(
67 immediate = true,
68 service = WorkflowOperationHandler.class,
69 property = {
70 "service.description=Mattermost Notification Operation Handler",
71 "workflow.operation=mattermost-notify"
72 }
73 )
74 public class MattermostNotificationWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
75
76
77
78 public static final String OPT_URL_PATH = "url";
79
80
81
82
83 public static final String OPT_NOTIFICATION_MESSAGE = "message";
84
85
86
87
88 public static final String OPT_METHOD = "method";
89
90
91
92
93 public static final String OPT_MAX_RETRY = "max-retry";
94
95
96
97
98 public static final String OPT_TIMEOUT = "timeout";
99
100
101
102
103 public static final String HTTP_PARAM_PAYLOAD = "payload";
104
105
106
107
108 public static final String POST = "post";
109
110
111
112
113 public static final String PUT = "put";
114
115
116
117
118 private static final Logger logger = LoggerFactory.getLogger(MattermostNotificationWorkflowOperationHandler.class);
119
120
121
122
123 private static final int DEFAULT_MAX_RETRY = 5;
124
125
126
127
128 private static final int DEFAULT_TIMEOUT = 10 * 1000;
129
130
131
132
133 public static final int INITIAL_SLEEP_TIME = 10 * 1000;
134
135
136
137
138 public static final int SLEEP_SCALE_FACTOR = 2;
139
140
141
142
143 private static Gson gson = new GsonBuilder().create();
144
145
146
147
148 @Override
149 public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
150 throws WorkflowOperationException {
151 logger.debug("Running HTTP notification workflow operation on workflow {}", workflowInstance.getId());
152 int maxRetry = DEFAULT_MAX_RETRY;
153 int timeout = DEFAULT_TIMEOUT;
154
155
156 String urlPath = getConfig(workflowInstance, OPT_URL_PATH);
157
158
159 String notificationMessage = getConfig(workflowInstance, OPT_NOTIFICATION_MESSAGE, null);
160 String method = getConfig(workflowInstance, OPT_METHOD, POST);
161 String maxRetryOpt = getConfig(workflowInstance, OPT_MAX_RETRY, null);
162 String timeoutOpt = getConfig(workflowInstance, OPT_TIMEOUT, null);
163
164
165 if (timeoutOpt != null) {
166 timeout = Integer.parseInt(timeoutOpt) * 1000;
167 }
168
169
170 if (maxRetryOpt != null) {
171 maxRetry = Integer.parseInt(maxRetryOpt);
172 }
173
174
175 HttpEntityEnclosingRequestBase request;
176 if (StringUtils.equalsIgnoreCase(POST, method)) {
177 request = new HttpPost(urlPath);
178 } else if (StringUtils.equalsIgnoreCase(PUT, method)) {
179 request = new HttpPut(urlPath);
180 } else {
181 throw new WorkflowOperationException("The configuration key '" + OPT_METHOD + "' only supports 'post' and 'put'");
182 }
183 logger.debug("Request will be sent using the '{}' method", method);
184
185
186 MediaPackage mp = workflowInstance.getMediaPackage();
187 try {
188 List<BasicNameValuePair> params = new ArrayList<>();
189
190
191 if (notificationMessage != null) {
192 params.add(new BasicNameValuePair(HTTP_PARAM_PAYLOAD, makeJson(notificationMessage, workflowInstance, mp)));
193 }
194
195 request.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
196 } catch (UnsupportedEncodingException e) {
197 throw new WorkflowOperationException("Error encoding the event parameter as form parameter", e);
198 }
199
200
201 if (!executeRequest(request, maxRetry, timeout, INITIAL_SLEEP_TIME)) {
202 throw new WorkflowOperationException(format("Notification could not be delivered to %s", urlPath));
203 }
204
205 return createResult(mp, Action.CONTINUE);
206 }
207
208
209
210
211
212
213
214
215
216
217 private String makeJson(String s, WorkflowInstance workflowInstance, MediaPackage mediaPackage) {
218 s = s.replace("%t", checkIfNull(workflowInstance.getTitle(), "Title"));
219 s = s.replace("%i", String.valueOf(workflowInstance.getId()));
220 s = s.replace("%s", String.valueOf(workflowInstance.getState()));
221 s = s.replace("%o", String.valueOf(workflowInstance.getCurrentOperation().getId()));
222 s = s.replace("%I", checkIfNull(mediaPackage.getIdentifier(), "Mediapackage-ID"));
223 s = s.replace("%T", checkIfNull(mediaPackage.getTitle(), "Mediapackage-Title"));
224 s = s.replace("%c", checkIfNull(mediaPackage.getContributors(), "Contributors"));
225 s = s.replace("%C", checkIfNull(mediaPackage.getCreators(), "Creators"));
226 s = s.replace("%D", checkIfNull(mediaPackage.getDate(), "Date"));
227 s = s.replace("%d", checkIfNull(mediaPackage.getDuration(), "Duration"));
228 s = s.replace("%l", checkIfNull(mediaPackage.getLanguage(), "Language"));
229 s = s.replace("%L", checkIfNull(mediaPackage.getLicense(), "License"));
230 s = s.replace("%S", checkIfNull(mediaPackage.getSeriesTitle(), "Series-Title"));
231
232 JsonObject json = new JsonObject();
233 json.addProperty("text", s);
234 return gson.toJson(json);
235 }
236
237
238
239
240
241
242
243
244
245 private String checkIfNull(Object o, String s) {
246
247 if (o == null) {
248 return s + "not defined";
249 }
250 if (o instanceof String[]) {
251 return join((String[]) o, ',');
252 }
253 return o.toString();
254
255 }
256
257
258
259
260
261
262
263
264
265
266
267 private boolean executeRequest(HttpUriRequest request, int maxAttempts, int timeout, int sleepTime) {
268
269 logger.debug("Executing notification request on target {}, {} attempts left", request.getURI(), maxAttempts);
270
271 RequestConfig config = RequestConfig.custom()
272 .setConnectTimeout(timeout)
273 .setConnectionRequestTimeout(timeout)
274 .setSocketTimeout(timeout)
275 .build();
276 CloseableHttpClient httpClient = HttpClientBuilder.create()
277 .useSystemProperties()
278 .setDefaultRequestConfig(config)
279 .build();
280
281 HttpResponse response;
282 try {
283 response = httpClient.execute(request);
284 } catch (ClientProtocolException e) {
285 logger.error("Protocol error during execution of query on target {}", request.getURI(), e);
286 return false;
287 } catch (IOException e) {
288 logger.error("I/O error during execution of query on target {}", request.getURI(), e);
289 return false;
290 }
291
292 Integer statusCode = response.getStatusLine().getStatusCode();
293 if (statusCode == SC_OK || statusCode == SC_NO_CONTENT || statusCode == SC_ACCEPTED) {
294 logger.debug("Request successfully executed on target {}, status code: {}", request.getURI(), statusCode);
295 return true;
296 } else if (maxAttempts > 1) {
297 logger.debug("Request failed on target {}, status code: {}, will retry in {} seconds", request.getURI(),
298 statusCode, sleepTime / 1000);
299 try {
300 Thread.sleep(sleepTime);
301 return executeRequest(request, --maxAttempts, timeout, sleepTime * SLEEP_SCALE_FACTOR);
302 } catch (InterruptedException e) {
303 logger.error("Error during sleep time before new notification request try", e);
304 return false;
305 }
306 } else {
307 logger.error("Request failed on target {}, status code: {}, no more attempt.", request.getURI(), statusCode);
308 return false;
309 }
310 }
311 }