MattermostNotificationWorkflowOperationHandler.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.workflow.handler.mattermost.notification;
import static java.lang.String.format;
import static org.apache.commons.lang3.StringUtils.join;
import static org.apache.http.HttpStatus.SC_ACCEPTED;
import static org.apache.http.HttpStatus.SC_NO_CONTENT;
import static org.apache.http.HttpStatus.SC_OK;
import org.opencastproject.job.api.JobContext;
import org.opencastproject.mediapackage.MediaPackage;
import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
import org.opencastproject.workflow.api.WorkflowInstance;
import org.opencastproject.workflow.api.WorkflowOperationException;
import org.opencastproject.workflow.api.WorkflowOperationHandler;
import org.opencastproject.workflow.api.WorkflowOperationResult;
import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicNameValuePair;
import org.osgi.service.component.annotations.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
/**
* Workflow operation for notifying Mattermost about the status of the current workflow.
*/
@Component(
immediate = true,
service = WorkflowOperationHandler.class,
property = {
"service.description=Mattermost Notification Operation Handler",
"workflow.operation=mattermost-notify"
}
)
public class MattermostNotificationWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
/**
* Configuration key for the target URL of the notification request
*/
public static final String OPT_URL_PATH = "url";
/**
* Configuration key for the notification message
*/
public static final String OPT_NOTIFICATION_MESSAGE = "message";
/**
* Configuration key for the HTTP method to use (put or post)
*/
public static final String OPT_METHOD = "method";
/**
* Configuration key for the maximal attempts for the notification request
*/
public static final String OPT_MAX_RETRY = "max-retry";
/**
* Configuration key for the request timeout in milliseconds
*/
public static final String OPT_TIMEOUT = "timeout";
/**
* Name of the subject HTTP parameter
*/
public static final String HTTP_PARAM_PAYLOAD = "payload";
/**
* HTTP method POST
*/
public static final String POST = "post";
/**
* HTTP method PUT
*/
public static final String PUT = "put";
/**
* The logging facility
*/
private static final Logger logger = LoggerFactory.getLogger(MattermostNotificationWorkflowOperationHandler.class);
/**
* Default value for the number of attempts for a request
*/
private static final int DEFAULT_MAX_RETRY = 5;
/**
* Default maximum wait time the client when trying to execute a request in milliseconds
*/
private static final int DEFAULT_TIMEOUT = 10 * 1000;
/**
* Default time between two request attempts in milliseconds
*/
public static final int INITIAL_SLEEP_TIME = 10 * 1000;
/**
* The scale factor to the sleep time between two notification attempts
*/
public static final int SLEEP_SCALE_FACTOR = 2;
/**
* Gson instance for JSON serialization.
*/
private static Gson gson = new GsonBuilder().create();
/**
* {@inheritDoc}
*/
@Override
public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
throws WorkflowOperationException {
logger.debug("Running HTTP notification workflow operation on workflow {}", workflowInstance.getId());
int maxRetry = DEFAULT_MAX_RETRY;
int timeout = DEFAULT_TIMEOUT;
// Required configuration
String urlPath = getConfig(workflowInstance, OPT_URL_PATH);
// Optional configuration
String notificationMessage = getConfig(workflowInstance, OPT_NOTIFICATION_MESSAGE, null);
String method = getConfig(workflowInstance, OPT_METHOD, POST);
String maxRetryOpt = getConfig(workflowInstance, OPT_MAX_RETRY, null);
String timeoutOpt = getConfig(workflowInstance, OPT_TIMEOUT, null);
// If set, convert the timeout to milliseconds
if (timeoutOpt != null) {
timeout = Integer.parseInt(timeoutOpt) * 1000;
}
// Is there a need to retry on failure?
if (maxRetryOpt != null) {
maxRetry = Integer.parseInt(maxRetryOpt);
}
// Figure out which request method to use
HttpEntityEnclosingRequestBase request;
if (StringUtils.equalsIgnoreCase(POST, method)) {
request = new HttpPost(urlPath);
} else if (StringUtils.equalsIgnoreCase(PUT, method)) {
request = new HttpPut(urlPath);
} else {
throw new WorkflowOperationException("The configuration key '" + OPT_METHOD + "' only supports 'post' and 'put'");
}
logger.debug("Request will be sent using the '{}' method", method);
// Add event parameters as form parameters
MediaPackage mp = workflowInstance.getMediaPackage();
try {
List<BasicNameValuePair> params = new ArrayList<>();
// Add the subject (if specified)
if (notificationMessage != null) {
params.add(new BasicNameValuePair(HTTP_PARAM_PAYLOAD, makeJson(notificationMessage, workflowInstance, mp)));
}
request.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new WorkflowOperationException("Error encoding the event parameter as form parameter", e);
}
// Execute the request
if (!executeRequest(request, maxRetry, timeout, INITIAL_SLEEP_TIME)) {
throw new WorkflowOperationException(format("Notification could not be delivered to %s", urlPath));
}
return createResult(mp, Action.CONTINUE);
}
/**
* Gets a notification message with placeholders and substitute them with corresponding meta-data of workflowInstance.
* The resulting String is transformed to a Json-String
*
* @param s The notification message to transform to Json-String
* @param workflowInstance The workflowInstance which getting metadata from
* @param mediaPackage The mediaPackage
* @return JSON-String containing the information of the workflowInstance
*/
private String makeJson(String s, WorkflowInstance workflowInstance, MediaPackage mediaPackage) {
s = s.replace("%t", checkIfNull(workflowInstance.getTitle(), "Title"));
s = s.replace("%i", String.valueOf(workflowInstance.getId()));
s = s.replace("%s", String.valueOf(workflowInstance.getState()));
s = s.replace("%o", String.valueOf(workflowInstance.getCurrentOperation().getId()));
s = s.replace("%I", checkIfNull(mediaPackage.getIdentifier(), "Mediapackage-ID"));
s = s.replace("%T", checkIfNull(mediaPackage.getTitle(), "Mediapackage-Title"));
s = s.replace("%c", checkIfNull(mediaPackage.getContributors(), "Contributors"));
s = s.replace("%C", checkIfNull(mediaPackage.getCreators(), "Creators"));
s = s.replace("%D", checkIfNull(mediaPackage.getDate(), "Date"));
s = s.replace("%d", checkIfNull(mediaPackage.getDuration(), "Duration"));
s = s.replace("%l", checkIfNull(mediaPackage.getLanguage(), "Language"));
s = s.replace("%L", checkIfNull(mediaPackage.getLicense(), "License"));
s = s.replace("%S", checkIfNull(mediaPackage.getSeriesTitle(), "Series-Title"));
JsonObject json = new JsonObject();
json.addProperty("text", s);
return gson.toJson(json);
}
/**
* Checks if an object is null. If an object is null, then method returns not defined, else it returns object as a
* String
*
* @param o The object to check
* @param s The name of metadata to check
* @return String containing the transformed object
*/
private String checkIfNull(Object o, String s) {
if (o == null) {
return s + "not defined";
}
if (o instanceof String[]) {
return join((String[]) o, ',');
}
return o.toString();
}
/**
* Execute the given notification request. If the target is not responding, retry as many time as the maxAttampts
* parameter with in between each try a sleep time.
*
* @param request The request to execute
* @param maxAttempts The number of attempts in case of error
* @param timeout The wait time in milliseconds at which a connection attempt will throw
* @param sleepTime The sleep time in milliseconds of a connection
* @return true if the request has been executed successfully
*/
private boolean executeRequest(HttpUriRequest request, int maxAttempts, int timeout, int sleepTime) {
logger.debug("Executing notification request on target {}, {} attempts left", request.getURI(), maxAttempts);
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(timeout)
.setConnectionRequestTimeout(timeout)
.setSocketTimeout(timeout)
.build();
CloseableHttpClient httpClient = HttpClientBuilder.create()
.useSystemProperties()
.setDefaultRequestConfig(config)
.build();
HttpResponse response;
try {
response = httpClient.execute(request);
} catch (ClientProtocolException e) {
logger.error("Protocol error during execution of query on target {}", request.getURI(), e);
return false;
} catch (IOException e) {
logger.error("I/O error during execution of query on target {}", request.getURI(), e);
return false;
}
Integer statusCode = response.getStatusLine().getStatusCode();
if (statusCode == SC_OK || statusCode == SC_NO_CONTENT || statusCode == SC_ACCEPTED) {
logger.debug("Request successfully executed on target {}, status code: {}", request.getURI(), statusCode);
return true;
} else if (maxAttempts > 1) {
logger.debug("Request failed on target {}, status code: {}, will retry in {} seconds", request.getURI(),
statusCode, sleepTime / 1000);
try {
Thread.sleep(sleepTime);
return executeRequest(request, --maxAttempts, timeout, sleepTime * SLEEP_SCALE_FACTOR);
} catch (InterruptedException e) {
logger.error("Error during sleep time before new notification request try", e);
return false;
}
} else {
logger.error("Request failed on target {}, status code: {}, no more attempt.", request.getURI(), statusCode);
return false;
}
}
}