MatrixNotificationWorkflowOperationHandler.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.workflow.handler.matrix;
import static java.lang.String.format;
import static org.apache.http.HttpStatus.SC_ACCEPTED;
import static org.apache.http.HttpStatus.SC_NO_CONTENT;
import static org.apache.http.HttpStatus.SC_OK;
import org.opencastproject.job.api.JobContext;
import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
import org.opencastproject.workflow.api.WorkflowInstance;
import org.opencastproject.workflow.api.WorkflowOperationException;
import org.opencastproject.workflow.api.WorkflowOperationHandler;
import org.opencastproject.workflow.api.WorkflowOperationResult;
import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
import com.google.gson.Gson;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Modified;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
/**
* The Matrix notification workflow operation handler can send notifications to Matrix rooms.
*
* <p>
* This workflow operation sends a message to a specified Matrix room using the Matrix Client-Server API.
* It requires the following configuration parameters:
* </p>
* <ul>
* <li>{@code room-id} - The Matrix room ID to send the notification to</li>
* <li>{@code message} - The message body to send</li>
* </ul>
*
* <p>
* The handler also requires the following configuration properties to be set:
* </p>
* <ul>
* <li>{@code matrix.server.url} - The Matrix server URL (default: https://matrix.org)</li>
* <li>{@code matrix.access.token} - The Matrix access token for authentication</li>
* <li>{@code matrix.max.retry} - Maximum number of retry attempts (default: 5)</li>
* <li>{@code matrix.timeout} - Request timeout in seconds (default: 10)</li>
* </ul>
*/
@Component(
property = {
"service.description=Matrix Notification Workflow Operation Handler",
"workflow.operation=matrix-notify",
"service.pid=org.opencastproject.workflow.handler.matrix.MatrixNotificationWorkflowOperationHandler"
},
immediate = true,
service = WorkflowOperationHandler.class
)
public class MatrixNotificationWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
/**
* Configuration key for the Matrix room ID
*/
public static final String OPT_ROOM_ID = "room-id";
/**
* Configuration key for the Matrix message body
*/
public static final String OPT_MESSAGE = "message";
/**
* The logging facility
*/
private static final Logger logger = LoggerFactory.getLogger(MatrixNotificationWorkflowOperationHandler.class);
/**
* Default value for the number of attempts for a request
*/
private static final int DEFAULT_MAX_RETRY = 5;
/**
* Default maximum time in seconds the client should wait when trying to execute a request
*/
private static final int DEFAULT_TIMEOUT = 10;
/**
* Default time between two request attempts
*/
public static final int INITIAL_SLEEP_TIME = 10;
/**
* The scale factor to the sleep time between two notification attempts
*/
public static final int SLEEP_SCALE_FACTOR = 2;
/**
* Configuration key for the Matrix server URL
*/
private static final String CFG_SERVER_URL = "matrix.server.url";
/**
* Configuration key for the Matrix access token
*/
private static final String CFG_ACCESS_TOKEN = "matrix.access.token";
/**
* Configuration key for the maximal attempts for the notification request
*/
private static final String CFG_MAX_RETRY = "matrix.max.retry";
/**
* Configuration key for the request timeout
*/
private static final String CFG_TIMEOUT = "matrix.timeout";
/**
* The default Matrix server URL
*/
private static final String DEFAULT_SERVER_URL = "https://matrix.org";
/**
* The configured Matrix server URL
*/
private String serverUrl = DEFAULT_SERVER_URL;
/**
* The configured Matrix access token
*/
private String accessToken = "";
/**
* The maximal attempts for the notification request
*/
private int maxRetry = DEFAULT_MAX_RETRY;
/**
* The request timeout in seconds
*/
private int timeout = DEFAULT_TIMEOUT;
/**
* JSON parser
*/
private final Gson gson = new Gson();
/**
* Activates this component and initializes the configuration properties.
*
* @param cc the component context
*/
@Activate
@Modified
protected void activate(ComponentContext cc) {
var properties = cc.getProperties();
serverUrl = Objects.toString(properties.get(CFG_SERVER_URL), DEFAULT_SERVER_URL);
accessToken = Objects.toString(properties.get(CFG_ACCESS_TOKEN));
maxRetry = NumberUtils.toInt((String) properties.get(CFG_MAX_RETRY), DEFAULT_MAX_RETRY);
timeout = NumberUtils.toInt((String) properties.get(CFG_TIMEOUT), DEFAULT_TIMEOUT);
}
/**
* Starts the Matrix notification workflow operation.
*
* @param workflow the workflow instance
* @param context the job context
* @return the workflow operation result
* @throws WorkflowOperationException if the operation fails
*/
@Override
public WorkflowOperationResult start(WorkflowInstance workflow, JobContext context)
throws WorkflowOperationException {
var mediapackage = workflow.getMediaPackage();
logger.debug("Running Matrix notification workflow operation on media package {}", mediapackage);
// Required configuration
var roomId = getConfig(workflow, OPT_ROOM_ID);
var message = getConfig(workflow, OPT_MESSAGE);
// Create the JSON payload
var payload = gson.toJson(Map.of("msgtype", "m.text", "body", message));
// Execute the request
executeMatrixRequest(roomId, payload, maxRetry, INITIAL_SLEEP_TIME);
// Continue the workflow, passing the possibly modified media package to the next operation
return createResult(mediapackage, Action.CONTINUE);
}
/**
* Executes a Matrix request to send a notification to a room.
*
* @param roomId the Matrix room ID to send the notification to
* @param payload the JSON payload to send
* @param attempts the number of remaining attempts
* @param sleepTime the time to sleep between attempts (in seconds)
* @throws WorkflowOperationException if the operation fails
*/
private void executeMatrixRequest(String roomId, String payload, int attempts, int sleepTime)
throws WorkflowOperationException {
// Build the full URL for the Matrix API endpoint
var apiUrl = format("%s/_matrix/client/r0/rooms/%s/send/m.room.message", serverUrl, roomId);
logger.debug("Executing Matrix notification request on target {}, {} attempts left", apiUrl, maxRetry);
var config = RequestConfig.custom()
.setConnectTimeout(timeout * 1000)
.setConnectionRequestTimeout(timeout * 1000)
.setSocketTimeout(timeout * 1000)
.build();
try (var httpClient = HttpClientBuilder.create().setDefaultRequestConfig(config).build()) {
var request = new HttpPost(apiUrl);
request.setHeader("Authorization", "Bearer " + accessToken);
request.setHeader("Content-Type", "application/json");
request.setEntity(new StringEntity(payload));
var response = httpClient.execute(request);
var statusCode = response.getStatusLine().getStatusCode();
if (statusCode == SC_OK || statusCode == SC_NO_CONTENT || statusCode == SC_ACCEPTED) {
logger.debug("Matrix request successfully executed on target {}, status code: {}", apiUrl, statusCode);
return;
} else if (attempts > 0) {
logger.debug("Matrix request to {} failed (status: {}), retry in {} seconds", apiUrl, statusCode, sleepTime);
Thread.sleep(sleepTime * 1000L);
executeMatrixRequest(roomId, payload, --attempts, sleepTime * SLEEP_SCALE_FACTOR);
return;
}
throw new WorkflowOperationException(
format("Matrix request to %s failed (status: %s), no more attempt.", apiUrl, statusCode));
} catch (IOException | InterruptedException e) {
throw new WorkflowOperationException(e);
}
}
}