View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.workflow.handler.mattermost.notification;
22  
23  import static java.lang.String.format;
24  import static org.apache.commons.lang3.StringUtils.join;
25  import static org.apache.http.HttpStatus.SC_ACCEPTED;
26  import static org.apache.http.HttpStatus.SC_NO_CONTENT;
27  import static org.apache.http.HttpStatus.SC_OK;
28  
29  import org.opencastproject.job.api.JobContext;
30  import org.opencastproject.mediapackage.MediaPackage;
31  import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
32  import org.opencastproject.workflow.api.WorkflowInstance;
33  import org.opencastproject.workflow.api.WorkflowOperationException;
34  import org.opencastproject.workflow.api.WorkflowOperationHandler;
35  import org.opencastproject.workflow.api.WorkflowOperationResult;
36  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
37  
38  import com.google.gson.Gson;
39  import com.google.gson.GsonBuilder;
40  import com.google.gson.JsonObject;
41  
42  import org.apache.commons.lang3.StringUtils;
43  import org.apache.http.HttpResponse;
44  import org.apache.http.client.ClientProtocolException;
45  import org.apache.http.client.config.RequestConfig;
46  import org.apache.http.client.entity.UrlEncodedFormEntity;
47  import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
48  import org.apache.http.client.methods.HttpPost;
49  import org.apache.http.client.methods.HttpPut;
50  import org.apache.http.client.methods.HttpUriRequest;
51  import org.apache.http.impl.client.CloseableHttpClient;
52  import org.apache.http.impl.client.HttpClientBuilder;
53  import org.apache.http.message.BasicNameValuePair;
54  import org.osgi.service.component.annotations.Component;
55  import org.slf4j.Logger;
56  import org.slf4j.LoggerFactory;
57  
58  import java.io.IOException;
59  import java.io.UnsupportedEncodingException;
60  import java.util.ArrayList;
61  import java.util.List;
62  
63  /**
64   * Workflow operation for notifying Mattermost about the status of the current workflow.
65   */
66  @Component(
67      immediate = true,
68      service = WorkflowOperationHandler.class,
69      property = {
70          "service.description=Mattermost Notification Operation Handler",
71          "workflow.operation=mattermost-notify"
72      }
73  )
74  public class MattermostNotificationWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
75    /**
76     * Configuration key for the target URL of the notification request
77     */
78    public static final String OPT_URL_PATH = "url";
79  
80    /**
81     * Configuration key for the notification message
82     */
83    public static final String OPT_NOTIFICATION_MESSAGE = "message";
84  
85    /**
86     * Configuration key for the HTTP method to use (put or post)
87     */
88    public static final String OPT_METHOD = "method";
89  
90    /**
91     * Configuration key for the maximal attempts for the notification request
92     */
93    public static final String OPT_MAX_RETRY = "max-retry";
94  
95    /**
96     * Configuration key for the request timeout in milliseconds
97     */
98    public static final String OPT_TIMEOUT = "timeout";
99  
100   /**
101    * Name of the subject HTTP parameter
102    */
103   public static final String HTTP_PARAM_PAYLOAD = "payload";
104 
105   /**
106    * HTTP method POST
107    */
108   public static final String POST = "post";
109 
110   /**
111    * HTTP method PUT
112    */
113   public static final String PUT = "put";
114 
115   /**
116    * The logging facility
117    */
118   private static final Logger logger = LoggerFactory.getLogger(MattermostNotificationWorkflowOperationHandler.class);
119 
120   /**
121    * Default value for the number of attempts for a request
122    */
123   private static final int DEFAULT_MAX_RETRY = 5;
124 
125   /**
126    * Default maximum wait time the client when trying to execute a request in milliseconds
127    */
128   private static final int DEFAULT_TIMEOUT = 10 * 1000;
129 
130   /**
131    * Default time between two request attempts in milliseconds
132    */
133   public static final int INITIAL_SLEEP_TIME = 10 * 1000;
134 
135   /**
136    * The scale factor to the sleep time between two notification attempts
137    */
138   public static final int SLEEP_SCALE_FACTOR = 2;
139 
140   /**
141    * Gson instance for JSON serialization.
142    */
143   private static Gson gson = new GsonBuilder().create();
144 
145   /**
146    * {@inheritDoc}
147    */
148   @Override
149   public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
150           throws WorkflowOperationException {
151     logger.debug("Running HTTP notification workflow operation on workflow {}", workflowInstance.getId());
152     int maxRetry = DEFAULT_MAX_RETRY;
153     int timeout = DEFAULT_TIMEOUT;
154 
155     // Required configuration
156     String urlPath = getConfig(workflowInstance, OPT_URL_PATH);
157 
158     // Optional configuration
159     String notificationMessage = getConfig(workflowInstance, OPT_NOTIFICATION_MESSAGE, null);
160     String method = getConfig(workflowInstance, OPT_METHOD, POST);
161     String maxRetryOpt = getConfig(workflowInstance, OPT_MAX_RETRY, null);
162     String timeoutOpt = getConfig(workflowInstance, OPT_TIMEOUT, null);
163 
164     // If set, convert the timeout to milliseconds
165     if (timeoutOpt != null) {
166       timeout = Integer.parseInt(timeoutOpt) * 1000;
167     }
168 
169     // Is there a need to retry on failure?
170     if (maxRetryOpt != null) {
171       maxRetry = Integer.parseInt(maxRetryOpt);
172     }
173 
174     // Figure out which request method to use
175     HttpEntityEnclosingRequestBase request;
176     if (StringUtils.equalsIgnoreCase(POST, method)) {
177       request = new HttpPost(urlPath);
178     } else if (StringUtils.equalsIgnoreCase(PUT, method)) {
179       request = new HttpPut(urlPath);
180     } else {
181       throw new WorkflowOperationException("The configuration key '" + OPT_METHOD + "' only supports 'post' and 'put'");
182     }
183     logger.debug("Request will be sent using the '{}' method", method);
184 
185     // Add event parameters as form parameters
186     MediaPackage mp = workflowInstance.getMediaPackage();
187     try {
188       List<BasicNameValuePair> params = new ArrayList<>();
189 
190       // Add the subject (if specified)
191       if (notificationMessage != null) {
192         params.add(new BasicNameValuePair(HTTP_PARAM_PAYLOAD, makeJson(notificationMessage, workflowInstance, mp)));
193       }
194 
195       request.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
196     } catch (UnsupportedEncodingException e) {
197       throw new WorkflowOperationException("Error encoding the event parameter as form parameter", e);
198     }
199 
200     // Execute the request
201     if (!executeRequest(request, maxRetry, timeout, INITIAL_SLEEP_TIME)) {
202       throw new WorkflowOperationException(format("Notification could not be delivered to %s", urlPath));
203     }
204 
205     return createResult(mp, Action.CONTINUE);
206   }
207 
208   /**
209    * Gets a notification message with placeholders and substitute them with corresponding meta-data of workflowInstance.
210    * The resulting String is transformed to a Json-String
211    *
212    * @param s                The notification message to transform to Json-String
213    * @param workflowInstance The workflowInstance which getting metadata from
214    * @param mediaPackage     The mediaPackage
215    * @return JSON-String containing the information of the workflowInstance
216    */
217   private String makeJson(String s, WorkflowInstance workflowInstance, MediaPackage mediaPackage) {
218     s = s.replace("%t", checkIfNull(workflowInstance.getTitle(), "Title"));
219     s = s.replace("%i", String.valueOf(workflowInstance.getId()));
220     s = s.replace("%s", String.valueOf(workflowInstance.getState()));
221     s = s.replace("%o", String.valueOf(workflowInstance.getCurrentOperation().getId()));
222     s = s.replace("%I", checkIfNull(mediaPackage.getIdentifier(), "Mediapackage-ID"));
223     s = s.replace("%T", checkIfNull(mediaPackage.getTitle(), "Mediapackage-Title"));
224     s = s.replace("%c", checkIfNull(mediaPackage.getContributors(), "Contributors"));
225     s = s.replace("%C", checkIfNull(mediaPackage.getCreators(), "Creators"));
226     s = s.replace("%D", checkIfNull(mediaPackage.getDate(), "Date"));
227     s = s.replace("%d", checkIfNull(mediaPackage.getDuration(), "Duration"));
228     s = s.replace("%l", checkIfNull(mediaPackage.getLanguage(), "Language"));
229     s = s.replace("%L", checkIfNull(mediaPackage.getLicense(), "License"));
230     s = s.replace("%S", checkIfNull(mediaPackage.getSeriesTitle(), "Series-Title"));
231 
232     JsonObject json = new JsonObject();
233     json.addProperty("text", s);
234     return gson.toJson(json);
235   }
236 
237   /**
238    * Checks if an object is null. If an object is null, then method returns not defined, else it returns object as a
239    * String
240    *
241    * @param o The object to check
242    * @param s The name of metadata to check
243    * @return String containing the transformed object
244    */
245   private String checkIfNull(Object o, String s) {
246 
247     if (o == null) {
248       return s + "not defined";
249     }
250     if (o instanceof String[]) {
251       return join((String[]) o, ',');
252     }
253     return o.toString();
254 
255   }
256 
257   /**
258    * Execute the given notification request. If the target is not responding, retry as many time as the maxAttampts
259    * parameter with in between each try a sleep time.
260    *
261    * @param request     The request to execute
262    * @param maxAttempts The number of attempts in case of error
263    * @param timeout     The wait time in milliseconds at which a connection attempt will throw
264    * @param sleepTime   The sleep time in milliseconds of a connection
265    * @return true if the request has been executed successfully
266    */
267   private boolean executeRequest(HttpUriRequest request, int maxAttempts, int timeout, int sleepTime) {
268 
269     logger.debug("Executing notification request on target {}, {} attempts left", request.getURI(), maxAttempts);
270 
271     RequestConfig config = RequestConfig.custom()
272                                         .setConnectTimeout(timeout)
273                                         .setConnectionRequestTimeout(timeout)
274                                         .setSocketTimeout(timeout)
275                                         .build();
276     CloseableHttpClient httpClient = HttpClientBuilder.create()
277                                                       .useSystemProperties()
278                                                       .setDefaultRequestConfig(config)
279                                                       .build();
280 
281     HttpResponse response;
282     try {
283       response = httpClient.execute(request);
284     } catch (ClientProtocolException e) {
285       logger.error("Protocol error during execution of query on target {}", request.getURI(), e);
286       return false;
287     } catch (IOException e) {
288       logger.error("I/O error during execution of query on target {}", request.getURI(), e);
289       return false;
290     }
291 
292     Integer statusCode = response.getStatusLine().getStatusCode();
293     if (statusCode == SC_OK || statusCode == SC_NO_CONTENT || statusCode == SC_ACCEPTED) {
294       logger.debug("Request successfully executed on target {}, status code: {}", request.getURI(), statusCode);
295       return true;
296     } else if (maxAttempts > 1) {
297       logger.debug("Request failed on target {}, status code: {}, will retry in {} seconds", request.getURI(),
298               statusCode, sleepTime / 1000);
299       try {
300         Thread.sleep(sleepTime);
301         return executeRequest(request, --maxAttempts, timeout, sleepTime * SLEEP_SCALE_FACTOR);
302       } catch (InterruptedException e) {
303         logger.error("Error during sleep time before new notification request try", e);
304         return false;
305       }
306     } else {
307       logger.error("Request failed on target {}, status code: {}, no more attempt.", request.getURI(), statusCode);
308       return false;
309     }
310   }
311 }