CaptureAgentStateServiceImpl.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.capture.admin.impl;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.opencastproject.capture.admin.api.AgentState.KNOWN_STATES;
import static org.opencastproject.capture.admin.api.AgentState.UNKNOWN;
import static org.opencastproject.db.Queries.namedQuery;
import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
import org.opencastproject.capture.admin.api.Agent;
import org.opencastproject.capture.admin.api.AgentState;
import org.opencastproject.capture.admin.api.CaptureAgentStateService;
import org.opencastproject.db.DBSession;
import org.opencastproject.db.DBSessionFactory;
import org.opencastproject.security.api.Organization;
import org.opencastproject.security.api.Role;
import org.opencastproject.security.api.SecurityConstants;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.User;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.util.data.Tuple3;
import org.opencastproject.util.function.ThrowingFunction;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.cm.ManagedServiceFactory;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Dictionary;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.NoResultException;
import javax.persistence.RollbackException;
import javax.persistence.TypedQuery;
/**
* IMPL for the capture-admin service (MH-1336, MH-1394, MH-1457, MH-1475 and MH-1476).
*/
@Component(
property = {
"service.description=Capture-Admin Service",
"service.pid=org.opencastproject.capture.agent"
},
immediate = true,
service = { CaptureAgentStateService.class , ManagedServiceFactory.class }
)
public class CaptureAgentStateServiceImpl implements CaptureAgentStateService, ManagedServiceFactory {
private static final Logger logger = LoggerFactory.getLogger(CaptureAgentStateServiceImpl.class);
/** The name of the persistence unit for this class */
public static final String PERSISTENCE_UNIT = "org.opencastproject.capture.admin.impl.CaptureAgentStateServiceImpl";
/** The delimiter for the CA configuration cache */
private static final String DELIMITER = ";==;";
/** The factory used to generate the entity manager */
protected EntityManagerFactory emf = null;
protected DBSessionFactory dbSessionFactory;
protected DBSession db;
/** The security service */
protected SecurityService securityService;
/** Maps the configuration PID to the agent ID, so agents can be updated via the configuration factory pattern */
protected Map<String, String> pidMap = new ConcurrentHashMap<>();
/** A cache of CA properties, which lightens the load on the SQL server */
private LoadingCache<String, Object> agentCache = null;
/** Configuration key for capture agent timeout in minutes before being marked offline */
public static final String CAPTURE_AGENT_TIMEOUT_KEY = "org.opencastproject.capture.admin.timeout";
/** A token to store in the miss cache */
protected Object nullToken = new Object();
/** OSGi DI */
@Reference(target = "(osgi.unit.name=org.opencastproject.capture.admin.impl.CaptureAgentStateServiceImpl)")
void setEntityManagerFactory(EntityManagerFactory emf) {
this.emf = emf;
}
@Reference
public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
this.dbSessionFactory = dbSessionFactory;
}
/**
* @param securityService
* the securityService to set
*/
@Reference
public void setSecurityService(SecurityService securityService) {
this.securityService = securityService;
}
public CaptureAgentStateServiceImpl() {
logger.info("CaptureAgentStateServiceImpl starting.");
}
@Activate
public void activate(ComponentContext cc) {
db = dbSessionFactory.createSession(emf);
// Set up the agent cache
int timeoutInMinutes = 120;
Optional<String> timeout = getOptContextProperty(cc, CAPTURE_AGENT_TIMEOUT_KEY);
if (timeout.isPresent()) {
try {
timeoutInMinutes = Integer.parseInt(timeout.get());
} catch (NumberFormatException e) {
logger.warn("Invalid configuration for capture agent status timeout (minutes) ({}={})",
CAPTURE_AGENT_TIMEOUT_KEY, timeout.get());
}
}
setupAgentCache(timeoutInMinutes, TimeUnit.MINUTES);
logger.info("Capture agent status timeout is {} minutes", timeoutInMinutes);
}
@Deactivate
public void deactivate() {
agentCache.invalidateAll();
db.close();
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.capture.admin.api.CaptureAgentStateService#getAgent(java.lang.String)
*/
@Override
public Agent getAgent(String name) throws NotFoundException {
String org = securityService.getOrganization().getId();
Agent agent = getAgent(name, org);
return updateCachedLastHeardFrom(agent, org);
}
/**
* Gets an agent by name and organization.
*
* @param name
* the unique agent name
* @param org
* the organization identifier
* @return the agent
*/
protected AgentImpl getAgent(String name, String org) throws NotFoundException {
return db.execChecked(getAgentEntityQuery(name, org));
}
/**
* Gets an agent by name and organization, using an open entitymanager.
*
* @param name
* the unique agent name
* @param organization
* the organization
* @return the agent or <code>null</code> if no agent has been found
*/
protected ThrowingFunction<EntityManager, AgentImpl, NotFoundException> getAgentEntityQuery(String name, String organization) {
return em -> {
try {
TypedQuery<AgentImpl> q = em.createNamedQuery("Agent.get", AgentImpl.class);
q.setParameter("id", name);
q.setParameter("org", organization);
return q.getSingleResult();
} catch (NoResultException e) {
throw new NotFoundException(e);
}
};
}
/**
* Mix in the last-seen timestamp from the agent cache
*
* @param agent
* The Agent you wish to update
* @param org
* the organization
* @return the agent
*/
protected Agent updateCachedLastHeardFrom(Agent agent, String org) {
String agentKey = agent.getName().concat(DELIMITER).concat(org);
Tuple3<String, Properties, Long> cachedAgent = (Tuple3) agentCache.getUnchecked(agentKey);
if (cachedAgent != null) {
agent.setLastHeardFrom(cachedAgent.getC());
}
return agent;
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.capture.admin.api.CaptureAgentStateService#getAgentState(java.lang.String)
*/
@Override
public String getAgentState(String agentName) throws NotFoundException {
String orgId = securityService.getOrganization().getId();
Tuple3<String, Properties, Long> agent = getAgentFromCache(agentName, orgId);
return agent.getA();
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.capture.admin.api.CaptureAgentStateService#setAgentState(java.lang.String,
* java.lang.String)
*/
@Override
public boolean setAgentState(String agentName, String state) {
if (StringUtils.isBlank(agentName))
throw new IllegalArgumentException("Unable to set agent state, agent name is blank or null.");
if (StringUtils.isBlank(state))
throw new IllegalArgumentException("Unable to set agent state, state is blank or null.");
if (!KNOWN_STATES.contains(state))
throw new IllegalArgumentException("Can not set agent to an invalid state: ".concat(state));
logger.debug("Agent '{}' state set to '{}'", agentName, state);
AgentImpl agent;
String orgId = securityService.getOrganization().getId();
try {
//Check the return code, if it's false then we don't need to update the DB, and we should also return false
if (!updateAgentInCache(agentName, state, orgId)) {
return false;
}
agent = (AgentImpl) getAgent(agentName);
// the agent is known, so set the state
logger.debug("Setting Agent {} to state {}.", agentName, state);
agent.setState(state);
if (!AgentState.UNKNOWN.equals(state)) {
agent.setLastHeardFrom(System.currentTimeMillis());
}
} catch (NotFoundException e) {
// If the agent doesn't exists, but the name is not null nor empty, create a new one.
logger.debug("Creating Agent {} with state {}.", agentName, state);
agent = new AgentImpl(agentName, orgId, state, "", new Properties());
}
updateAgentInDatabase(agent);
return true;
}
/**
* Updates the agent cache, and tells you whether you need to update the database as well
*
* @param agentName
* The name of the agent in thecache
* @param state
* The new state for the agent
* @param orgId
* The organization the agent is a part of
* @return
* True if the agent state database needs to be updated, false otherwise
*/
private boolean updateAgentInCache(String agentName, String state, String orgId) {
return updateAgentInCache(agentName, state, orgId, null);
}
/**
* Updates the agent cache, and tells you whether you need to update the database as well
*
* @param agentName
* The name of the agent in thecache
* @param state
* The new state for the agent
* @param orgId
* The organization the agent is a part of
* @param configuration
* The agent's configuration
* @return
* True if the agent state database needs to be updated, false otherwise
*/
private boolean updateAgentInCache(String agentName, String state, String orgId, Properties configuration) {
try {
String agentState = getAgentFromCache(agentName, orgId).getA();
Properties config = getAgentConfiguration(agentName);
if (configuration != null) {
config = configuration;
}
if (!AgentState.UNKNOWN.equals(state)) {
agentCache.put(agentName.concat(DELIMITER).concat(orgId),
Tuple3.tuple3(state, config, Long.valueOf(System.currentTimeMillis())));
} else {
//If we're putting the agent into an unknown state we're assuming that we didn't get a check in
// therefore we don't update the timestamp and persist to the DB
agentCache.put(agentName.concat(DELIMITER).concat(orgId),
Tuple3.tuple3(state, config, getAgentFromCache(agentName, orgId).getC()));
}
if (agentState.equals(state)) {
return false;
}
return true;
} catch (NotFoundException e) {
agentCache.put(agentName.concat(DELIMITER).concat(orgId),
Tuple3.tuple3(state, configuration, Long.valueOf(System.currentTimeMillis())));
return true;
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.capture.admin.api.CaptureAgentStateService#setAgentUrl(String, String)
*/
@Override
public boolean setAgentUrl(String agentName, String agentUrl) throws NotFoundException {
Agent agent = getAgent(agentName);
if (agent.getUrl().equals(agentUrl))
return false;
agent.setUrl(agentUrl);
updateAgentInDatabase((AgentImpl) agent);
return true;
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.capture.admin.api.CaptureAgentStateService#removeAgent(java.lang.String)
*/
@Override
public void removeAgent(String agentName) throws NotFoundException {
deleteAgentFromDatabase(agentName);
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.capture.admin.api.CaptureAgentStateService#getKnownAgents()
*/
@Override
public Map<String, Agent> getKnownAgents() {
agentCache.cleanUp();
User user = securityService.getUser();
Organization org = securityService.getOrganization();
String orgAdmin = org.getAdminRole();
Set<Role> roles = user.getRoles();
List<AgentImpl> agents = db.exec(namedQuery.findAll(
"Agent.byOrganization",
AgentImpl.class,
Pair.of("org", securityService.getOrganization().getId())
));
// Filter the results in memory if this user is not an administrator
if (!user.hasRole(SecurityConstants.GLOBAL_ADMIN_ROLE) && !user.hasRole(orgAdmin)) {
for (Iterator<AgentImpl> iter = agents.iterator(); iter.hasNext();) {
AgentImpl agent = iter.next();
Set<String> schedulerRoles = agent.getSchedulerRoles();
// If there are no roles associated with this capture agent, it is available to anyone who can pass the
// coarse-grained web layer security
if (schedulerRoles == null || schedulerRoles.isEmpty()) {
continue;
}
boolean hasSchedulerRole = false;
for (Role role : roles) {
if (schedulerRoles.contains(role.getName())) {
hasSchedulerRole = true;
break;
}
}
if (!hasSchedulerRole) {
iter.remove();
}
}
}
// Build the map that the API defines as agent name->agent
Map<String, Agent> map = new TreeMap<>();
for (AgentImpl agent : agents) {
map.put(agent.getName(), updateCachedLastHeardFrom(agent, org.getId()));
}
return map;
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.capture.admin.api.CaptureAgentStateService#getAgentCapabilities(java.lang.String)
*/
@Override
public Properties getAgentCapabilities(String agentName) throws NotFoundException {
return getAgent(agentName).getCapabilities();
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.capture.admin.api.CaptureAgentStateService#getAgentConfiguration(java.lang.String)
*/
@Override
public Properties getAgentConfiguration(String agentName) throws NotFoundException {
String orgId = securityService.getOrganization().getId();
Tuple3<String, Properties, Long> agent = getAgentFromCache(agentName, orgId);
return agent.getB();
}
@SuppressWarnings("unchecked")
private Tuple3<String, Properties, Long> getAgentFromCache(String agentName, String orgId) throws NotFoundException {
Object agent = agentCache.getUnchecked(agentName.concat(DELIMITER).concat(orgId));
if (agent == nullToken) {
throw new NotFoundException();
} else {
return (Tuple3<String, Properties, Long>) agent;
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.capture.admin.api.CaptureAgentStateService#setAgentConfiguration
*/
@Override
public boolean setAgentConfiguration(String agentName, Properties configuration) {
if (StringUtils.isBlank(agentName))
throw new IllegalArgumentException("Unable to set agent state, agent name is blank or null.");
String orgId = securityService.getOrganization().getId();
AgentImpl agent;
try {
Properties agentConfig = getAgentFromCache(agentName, orgId).getB();
if (agentConfig.equals(configuration)) {
agentCache.put(agentName.concat(DELIMITER).concat(orgId),
Tuple3.tuple3(getAgentState(agentName), agentConfig, Long.valueOf(System.currentTimeMillis())));
return false;
}
agent = (AgentImpl) getAgent(agentName);
logger.debug("Setting Agent {}'s capabilities", agentName);
agent.setConfiguration(configuration);
} catch (NotFoundException e) {
// If the agent doesn't exists, but the name is not null nor empty, create a new one.
logger.debug("Creating Agent {} with state {}.", agentName, UNKNOWN);
agent = new AgentImpl(agentName, orgId, UNKNOWN, "", configuration);
}
updateAgentInDatabase(agent);
return true;
}
/**
* Updates or adds an agent to the database.
*
* @param agent
* The Agent you wish to modify or add in the database.
*/
protected void updateAgentInDatabase(AgentImpl agent) {
updateAgentInDatabase(agent, true, 10);
}
/**
* Updates or adds an agent to the database.
*
* @param agent
* The Agent you wish to modify or add in the database.
* @param updateFromCache
* True to update the last heard from timestamp from the agentCache, false to avoid this.
* Note that you should nearly always update the cache, this was added to avoid deadlocks when removing agents from the cache.
*/
private void updateAgentInDatabase(AgentImpl agent, boolean updateFromCache, int retries) {
try {
db.execTx(retries, em -> {
//This is the cached last-heard-from time
Long cachedLastHeardFrom = -1L;
// Update the last seen property from the agent cache
if (updateFromCache) {
try {
cachedLastHeardFrom = getAgentFromCache(agent.getName(), agent.getOrganization()).getC();
} catch (NotFoundException e) {
// That's fine
}
}
try {
AgentImpl existing = getAgentEntityQuery(agent.getName(), agent.getOrganization()).apply(em);
existing.setConfiguration(agent.getConfiguration());
if (!AgentState.UNKNOWN.equals(agent.getState())) {
existing.setLastHeardFrom(Math.max(cachedLastHeardFrom, agent.getLastHeardFrom()));
}
existing.setState(agent.getState());
existing.setSchedulerRoles(agent.getSchedulerRoles());
existing.setUrl(agent.getUrl());
em.merge(existing);
} catch (NotFoundException e) {
em.persist(agent);
}
});
if (updateFromCache) {
updateAgentInCache(agent.getName(), agent.getState(), agent.getOrganization(), agent.getConfiguration());
}
} catch (RollbackException e) {
throw new RollbackException("Maximum number of retries exceeded", e);
}
}
/**
* Removes an agent from the database.
*
* @param agentName
* The name of the agent you wish to remove.
*/
private void deleteAgentFromDatabase(String agentName) throws NotFoundException {
try {
String org = securityService.getOrganization().getId();
db.execTxChecked(em -> {
Agent existing = getAgentEntityQuery(agentName, org).apply(em);
if (existing == null)
throw new NotFoundException();
em.remove(existing);
});
agentCache.invalidate(agentName.concat(DELIMITER).concat(org));
} catch (RollbackException e) {
logger.warn("Unable to commit to DB in deleteAgent.");
}
}
// // ManagedServiceFactory Methods ////
/**
* {@inheritDoc}
*
* @see org.osgi.service.cm.ManagedServiceFactory#getName()
*/
@Override
public String getName() {
return "org.opencastproject.capture.agent";
}
protected void setupAgentCache(int count, TimeUnit unit) {
// Setup the agent cache
RemovalListener<String, Object> removalListener = new RemovalListener<String, Object>() {
private Set<String> ignoredStates = new LinkedHashSet<>(Arrays.asList(AgentState.UNKNOWN, AgentState.OFFLINE));
@Override
public void onRemoval(RemovalNotification<String, Object> removal) {
if (RemovalCause.EXPIRED.equals(removal.getCause())) {
String org = securityService.getOrganization().getId();
try {
String agentName = removal.getKey().split(DELIMITER)[0];
AgentImpl agent = getAgent(agentName, org);
if (!ignoredStates.contains(agent.getState())) {
agent.setState(AgentState.OFFLINE);
updateAgentInDatabase(agent, false, 2);
}
} catch (NotFoundException e) {
//Ignore this
//It should not happen, and if it does we just don't update the non-existant agent in the DB
}
}
}
};
agentCache = CacheBuilder.newBuilder().expireAfterWrite(count, unit).removalListener(removalListener).build(new CacheLoader<String, Object>() {
@Override
public Object load(String id) {
String[] key = id.split(DELIMITER);
AgentImpl agent;
try {
agent = getAgent(key[0], key[1]);
} catch (NotFoundException e) {
return nullToken;
}
return Tuple3.tuple3(agent.getState(), agent.getConfiguration(), agent.getLastHeardFrom());
}
});
}
/**
* {@inheritDoc}
*
* @see org.osgi.service.cm.ManagedServiceFactory#updated(java.lang.String, java.util.Dictionary)
*/
@Override
public void updated(String pid, Dictionary<String, ?> properties) throws ConfigurationException {
// Get the agent properties
String nameConfig = (String) properties.get("id");
if (isBlank(nameConfig))
throw new ConfigurationException("id", "must be specified");
nameConfig = nameConfig.trim();
String urlConfig = (String) properties.get("url");
if (isBlank(urlConfig))
throw new ConfigurationException("url", "must be specified");
urlConfig = urlConfig.trim();
String orgConfig = (String) properties.get("organization");
if (isBlank(orgConfig))
throw new ConfigurationException("organization", "must be specified");
orgConfig = orgConfig.trim();
String schedulerRolesConfig = (String) properties.get("schedulerRoles");
if (isBlank(schedulerRolesConfig))
throw new ConfigurationException("schedulerRoles", "must be specified");
String[] schedulerRoles = schedulerRolesConfig.trim().split(",");
// If we don't already have a mapping for this PID, create one
if (!pidMap.containsKey(pid)) {
pidMap.put(pid, nameConfig);
}
AgentImpl agent;
try {
agent = getAgent(nameConfig, orgConfig);
agent.setUrl(urlConfig);
agent.setState(UNKNOWN);
} catch (NotFoundException e) {
agent = new AgentImpl(nameConfig, orgConfig, UNKNOWN, urlConfig, new Properties());
}
for (String role : schedulerRoles) {
agent.schedulerRoles.add(role.trim());
}
// Update the database
logger.info("Roles '{}' may schedule '{}'", schedulerRolesConfig, agent.name);
updateAgentInDatabase(agent);
}
/**
* {@inheritDoc}
*
* @see org.osgi.service.cm.ManagedServiceFactory#deleted(java.lang.String)
*/
@Override
public void deleted(String pid) {
String agentId = pidMap.remove(pid);
if (agentId == null) {
logger.warn("{} was not a managed capture agent pid", pid);
} else {
try {
deleteAgentFromDatabase(agentId);
} catch (NotFoundException e) {
logger.warn("Unable to delete capture agent '{}'", agentId);
}
}
}
}