View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  
23  package org.opencastproject.workflow.handler.notification;
24  
25  import static java.lang.String.format;
26  import static org.apache.http.HttpStatus.SC_ACCEPTED;
27  import static org.apache.http.HttpStatus.SC_NO_CONTENT;
28  import static org.apache.http.HttpStatus.SC_OK;
29  
30  import org.opencastproject.job.api.JobContext;
31  import org.opencastproject.serviceregistry.api.ServiceRegistry;
32  import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
33  import org.opencastproject.workflow.api.WorkflowInstance;
34  import org.opencastproject.workflow.api.WorkflowOperationException;
35  import org.opencastproject.workflow.api.WorkflowOperationHandler;
36  import org.opencastproject.workflow.api.WorkflowOperationResult;
37  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
38  
39  import org.apache.commons.lang3.StringUtils;
40  import org.apache.http.HttpResponse;
41  import org.apache.http.client.ClientProtocolException;
42  import org.apache.http.client.config.RequestConfig;
43  import org.apache.http.client.entity.UrlEncodedFormEntity;
44  import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
45  import org.apache.http.client.methods.HttpPost;
46  import org.apache.http.client.methods.HttpPut;
47  import org.apache.http.client.methods.HttpUriRequest;
48  import org.apache.http.impl.client.CloseableHttpClient;
49  import org.apache.http.impl.client.HttpClientBuilder;
50  import org.apache.http.message.BasicNameValuePair;
51  import org.osgi.service.component.annotations.Component;
52  import org.osgi.service.component.annotations.Reference;
53  import org.slf4j.Logger;
54  import org.slf4j.LoggerFactory;
55  
56  import java.io.IOException;
57  import java.io.UnsupportedEncodingException;
58  import java.util.ArrayList;
59  import java.util.List;
60  
61  /**
62   * Workflow operation handler that will send HTTP POST or PUT requests to a specified address.
63   * <p>
64   * The request can contain a message type and a message body and automatically includes the workflow instance id. Should
65   * the notification fail, a retry strategy is implemented.
66   * <p>
67   * Requests will be send using the POST method by default, PUT is a supported alternative method.
68   *
69   */
70  @Component(
71      immediate = true,
72      service = WorkflowOperationHandler.class,
73      property = {
74          "service.description=Http Notification Operation Handler",
75          "workflow.operation=http-notify"
76      }
77  )
78  public class HttpNotificationWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
79  
80    /** Configuration key for the target URL of the notification request */
81    public static final String OPT_URL_PATH = "url";
82  
83    /** Configuration key for the notification event */
84    public static final String OPT_NOTIFICATION_SUBJECT = "subject";
85  
86    /** Configuration key for the notification message */
87    public static final String OPT_NOTIFICATION_MESSAGE = "message";
88  
89    /** Configuration key for the HTTP method to use (put or post) */
90    public static final String OPT_METHOD = "method";
91  
92    /** Configuration key for the maximal attempts for the notification request */
93    public static final String OPT_MAX_RETRY = "max-retry";
94  
95    /** Configuration key for the request timeout */
96    public static final String OPT_TIMEOUT = "timeout";
97  
98    /** Name of the subject HTTP parameter */
99    public static final String HTTP_PARAM_SUBJECT = "subject";
100 
101   /** Name of the message HTTP parameter */
102   public static final String HTTP_PARAM_MESSAGE = "message";
103 
104   /** Name of the workflow instance id HTTP parameter */
105   public static final String HTTP_PARAM_WORKFLOW = "workflowInstanceId";
106 
107   /** The logging facility */
108   private static final Logger logger = LoggerFactory.getLogger(HttpNotificationWorkflowOperationHandler.class);
109 
110   /** Default value for the number of attempts for a request */
111   private static final int DEFAULT_MAX_RETRY = 5;
112 
113   /** Default maximum wait time the client when trying to execute a request */
114   private static final int DEFAULT_TIMEOUT = 10 * 1000;
115 
116   /** Default time between two request attempts */
117   public static final int INITIAL_SLEEP_TIME = 10 * 1000;
118 
119   /** The scale factor to the sleep time between two notification attempts */
120   public static final int SLEEP_SCALE_FACTOR = 2;
121 
122   /**
123    * {@inheritDoc}
124    */
125   @Override
126   public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
127           throws WorkflowOperationException {
128     logger.debug("Running HTTP notification workflow operation on workflow {}", workflowInstance.getId());
129 
130     int maxRetry = DEFAULT_MAX_RETRY;
131     int timeout = DEFAULT_TIMEOUT;
132 
133     // Required configuration
134     String urlPath = getConfig(workflowInstance, OPT_URL_PATH);
135 
136     // Optional configuration
137     String notificationSubject = getConfig(workflowInstance, OPT_NOTIFICATION_SUBJECT, null);
138     String notificationMessage = getConfig(workflowInstance, OPT_NOTIFICATION_MESSAGE, null);
139     String method = getConfig(workflowInstance, OPT_METHOD, "post");
140     String maxRetryOpt = getConfig(workflowInstance, OPT_MAX_RETRY, null);
141     String timeoutOpt = getConfig(workflowInstance, OPT_TIMEOUT, null);
142 
143 
144     // If set, convert the timeout to milliseconds
145     if (timeoutOpt != null) {
146       timeout = Integer.parseInt(timeoutOpt) * 1000;
147     }
148 
149     // Is there a need to retry on failure?
150     if (maxRetryOpt != null) {
151       maxRetry = Integer.parseInt(maxRetryOpt);
152     }
153 
154     // Figure out which request method to use
155     HttpEntityEnclosingRequestBase request = null;
156     if (StringUtils.equalsIgnoreCase("post", method)) {
157       request = new HttpPost(urlPath);
158     } else if (StringUtils.equalsIgnoreCase("put", method)) {
159       request = new HttpPut(urlPath);
160     } else {
161       throw new WorkflowOperationException("The configuration key '" + OPT_METHOD + "' only supports 'post' and 'put'");
162     }
163     logger.debug("Request will be sent using the '{}' method", method);
164 
165     // Add event parameters as form parameters
166     try {
167       List<BasicNameValuePair> params = new ArrayList<>();
168 
169       // Add the subject (if specified)
170       if (notificationSubject != null) {
171         params.add(new BasicNameValuePair(HTTP_PARAM_SUBJECT, notificationSubject));
172       }
173 
174       // Add the message (if specified)
175       if (notificationMessage != null) {
176         params.add(new BasicNameValuePair(HTTP_PARAM_MESSAGE, notificationMessage));
177       }
178 
179       // Add the workflow instance id
180       params.add(new BasicNameValuePair(HTTP_PARAM_WORKFLOW, Long.toString(workflowInstance.getId())));
181 
182       request.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
183     } catch (UnsupportedEncodingException e) {
184       throw new WorkflowOperationException(
185               "Error happened during the encoding of the event parameter as form parameter:", e);
186     }
187 
188     // Execute the request
189     if (!executeRequest(request, maxRetry, timeout, INITIAL_SLEEP_TIME)) {
190       throw new WorkflowOperationException(format("Notification could not be delivered to %s", urlPath));
191     }
192 
193     return createResult(workflowInstance.getMediaPackage(), Action.CONTINUE);
194   }
195 
196   /**
197    * Execute the given notification request. If the target is not responding, retry as many time as the maxAttampts
198    * parameter with in between each try a sleep time.
199    *
200    * @param request
201    *          The request to execute
202    * @param maxAttempts
203    *          The number of attempts in case of error
204    * @param timeout
205    *          The wait time in milliseconds at which a connection attempt will throw
206    * @return true if the request has been executed successfully
207    */
208   private boolean executeRequest(HttpUriRequest request, int maxAttempts, int timeout, int sleepTime) {
209 
210     logger.debug("Executing notification request on target {}, {} attempts left", request.getURI(), maxAttempts);
211 
212     RequestConfig config = RequestConfig.custom()
213             .setConnectTimeout(timeout)
214             .setConnectionRequestTimeout(timeout)
215             .setSocketTimeout(timeout).build();
216     CloseableHttpClient httpClient = HttpClientBuilder.create().setDefaultRequestConfig(config).build();
217 
218     HttpResponse response;
219     try {
220       response = httpClient.execute(request);
221     } catch (ClientProtocolException e) {
222       logger.error("Protocol error during execution of query on target {}: {}", request.getURI(), e.getMessage());
223       return false;
224     } catch (IOException e) {
225       logger.error("I/O error during execution of query on target {}: {}", request.getURI(), e.getMessage());
226       return false;
227     }
228 
229     Integer statusCode = response.getStatusLine().getStatusCode();
230     if (statusCode == SC_OK || statusCode == SC_NO_CONTENT || statusCode == SC_ACCEPTED) {
231       logger.debug("Request successfully executed on target {}, status code: {}", request.getURI(), statusCode);
232       return true;
233     } else if (maxAttempts > 1) {
234       logger.debug("Request failed on target {}, status code: {}, will retry in {} seconds", request.getURI(),
235               statusCode, sleepTime / 1000);
236       try {
237         Thread.sleep(sleepTime);
238         return executeRequest(request, --maxAttempts, timeout, sleepTime * SLEEP_SCALE_FACTOR);
239       } catch (InterruptedException e) {
240         logger.error("Error during sleep time before new notification request try: {}", e.getMessage());
241         return false;
242       }
243     } else {
244       logger.warn("Request failed on target {}, status code: {}, no more attempt.", request.getURI(), statusCode);
245       return false;
246     }
247   }
248 
249   @Reference
250   @Override
251   public void setServiceRegistry(ServiceRegistry serviceRegistry) {
252     super.setServiceRegistry(serviceRegistry);
253   }
254 
255 }