ServiceRegistryJpaImpl.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.serviceregistry.impl;
import static java.lang.String.format;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.opencastproject.db.Queries.namedQuery;
import static org.opencastproject.job.api.AbstractJobProducer.ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY;
import static org.opencastproject.job.api.AbstractJobProducer.DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING;
import static org.opencastproject.job.api.Job.FailureReason.DATA;
import static org.opencastproject.job.api.Job.Status.FAILED;
import static org.opencastproject.serviceregistry.api.ServiceState.ERROR;
import static org.opencastproject.serviceregistry.api.ServiceState.NORMAL;
import static org.opencastproject.serviceregistry.api.ServiceState.WARNING;
import static org.opencastproject.util.OsgiUtil.getOptContextProperty;
import org.opencastproject.db.DBSession;
import org.opencastproject.db.DBSessionFactory;
import org.opencastproject.job.api.Job;
import org.opencastproject.job.api.Job.Status;
import org.opencastproject.job.jpa.JpaJob;
import org.opencastproject.security.api.Organization;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.TrustedHttpClient;
import org.opencastproject.security.api.TrustedHttpClientException;
import org.opencastproject.security.api.User;
import org.opencastproject.serviceregistry.api.HostRegistration;
import org.opencastproject.serviceregistry.api.HostStatistics;
import org.opencastproject.serviceregistry.api.IncidentService;
import org.opencastproject.serviceregistry.api.Incidents;
import org.opencastproject.serviceregistry.api.JaxbServiceStatistics;
import org.opencastproject.serviceregistry.api.ServiceRegistration;
import org.opencastproject.serviceregistry.api.ServiceRegistry;
import org.opencastproject.serviceregistry.api.ServiceRegistryException;
import org.opencastproject.serviceregistry.api.ServiceStatistics;
import org.opencastproject.serviceregistry.api.SystemLoad;
import org.opencastproject.serviceregistry.api.SystemLoad.NodeLoad;
import org.opencastproject.serviceregistry.impl.jmx.HostsStatistics;
import org.opencastproject.serviceregistry.impl.jmx.JobsStatistics;
import org.opencastproject.serviceregistry.impl.jmx.ServicesStatistics;
import org.opencastproject.serviceregistry.impl.jpa.HostRegistrationJpaImpl;
import org.opencastproject.serviceregistry.impl.jpa.ServiceRegistrationJpaImpl;
import org.opencastproject.systems.OpencastConstants;
import org.opencastproject.util.NotFoundException;
import org.opencastproject.util.UrlSupport;
import org.opencastproject.util.data.functions.Strings;
import org.opencastproject.util.function.ThrowingConsumer;
import org.opencastproject.util.jmx.JmxUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpHead;
import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.cm.ManagedService;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.management.ObjectInstance;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.LockModeType;
import javax.persistence.NoResultException;
import javax.persistence.TypedQuery;
/** JPA implementation of the {@link ServiceRegistry} */
@Component(
property = {
"service.description=Service registry"
},
immediate = true,
service = { ManagedService.class, ServiceRegistry.class, ServiceRegistryJpaImpl.class }
)
public class ServiceRegistryJpaImpl implements ServiceRegistry, ManagedService {
/** JPA persistence unit name */
public static final String PERSISTENCE_UNIT = "org.opencastproject.common";
/** Id of the workflow's start operation operation, need to match the corresponding enum value in WorkflowServiceImpl */
public static final String START_OPERATION = "START_OPERATION";
/** Id of the workflow's start workflow operation, need to match the corresponding enum value in WorkflowServiceImpl */
public static final String START_WORKFLOW = "START_WORKFLOW";
/** Id of the workflow's resume operation, need to match the corresponding enum value in WorkflowServiceImpl */
public static final String RESUME = "RESUME";
/** Identifier for the workflow service */
public static final String TYPE_WORKFLOW = "org.opencastproject.workflow";
static final Logger logger = LoggerFactory.getLogger(ServiceRegistryJpaImpl.class);
/** The list of registered JMX beans */
protected List<ObjectInstance> jmxBeans = new ArrayList<>();
/** Hosts statistics JMX type */
private static final String JMX_HOSTS_STATISTICS_TYPE = "HostsStatistics";
/** Services statistics JMX type */
private static final String JMX_SERVICES_STATISTICS_TYPE = "ServicesStatistics";
/** Jobs statistics JMX type */
private static final String JMX_JOBS_STATISTICS_TYPE = "JobsStatistics";
/** The JMX business object for hosts statistics */
private HostsStatistics hostsStatistics;
/** The JMX business object for services statistics */
private ServicesStatistics servicesStatistics;
/** The JMX business object for jobs statistics */
private JobsStatistics jobsStatistics;
/** Current job used to process job in the service registry */
private static final ThreadLocal<Job> currentJob = new ThreadLocal<>();
/** Configuration key for the maximum load */
protected static final String OPT_MAXLOAD = "org.opencastproject.server.maxload";
/** Configuration key for the interval to check whether the hosts in the service registry are still alive, in seconds */
protected static final String OPT_HEARTBEATINTERVAL = "heartbeat.interval";
/** Configuration key for the collection of job statistics */
protected static final String OPT_JOBSTATISTICS = "jobstats.collect";
/** Configuration key for the retrieval of service statistics: Do not consider jobs older than max_job_age (in days) */
protected static final String OPT_SERVICE_STATISTICS_MAX_JOB_AGE = "org.opencastproject.statistics.services.max_job_age";
/** Configuration key for the encoding preferred worker nodes */
protected static final String OPT_ENCODING_WORKERS = "org.opencastproject.encoding.workers";
/** Configuration key for the encoding workers load threshold */
protected static final String OPT_ENCODING_THRESHOLD = "org.opencastproject.encoding.workers.threshold";
/** The http client to use when connecting to remote servers */
protected TrustedHttpClient client = null;
/** Default jobs limit during dispatching
* (larger value will fetch more entries from the database at the same time and increase RAM usage) */
static final int DEFAULT_DISPATCH_JOBS_LIMIT = 100;
/** Default setting on job statistics collection */
static final boolean DEFAULT_JOB_STATISTICS = false;
/** Default setting on service statistics retrieval */
static final int DEFAULT_SERVICE_STATISTICS_MAX_JOB_AGE = 14;
static final List<String> DEFAULT_ENCODING_WORKERS = new ArrayList<String>();
static final double DEFAULT_ENCODING_THRESHOLD = 0.0;
/** The configuration key for setting {@link #maxAttemptsBeforeErrorState} */
static final String MAX_ATTEMPTS_CONFIG_KEY = "max.attempts";
/** The configuration key for setting {@link #noErrorStateServiceTypes} */
static final String NO_ERROR_STATE_SERVICE_TYPES_CONFIG_KEY = "no.error.state.service.types";
/** Default value for {@link #maxAttemptsBeforeErrorState} */
private static final int DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE = 10;
/** Default value for {@link #errorStatesEnabled} */
private static final boolean DEFAULT_ERROR_STATES_ENABLED = true;
/** Number of failed jobs on a service before to set it in error state. -1 will disable error states completely. */
protected int maxAttemptsBeforeErrorState = DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE;
private boolean errorStatesEnabled = DEFAULT_ERROR_STATES_ENABLED;
/** Services for which error state is disabled */
private List<String> noErrorStateServiceTypes = new ArrayList<>();
/** Default delay between checking if hosts are still alive in seconds * */
static final long DEFAULT_HEART_BEAT = 60;
/** Default job load when not passed by service creating the job * */
static final float DEFAULT_JOB_LOAD = 0.1f;
/** This host's base URL */
protected String hostName;
/** This host's descriptive node name eg admin, worker01 */
protected String nodeName;
/** The base URL for job URLs */
protected String jobHost;
/** Comma-seperate list with URLs of encoding specialised workers*/
protected static List<String> encodingWorkers = DEFAULT_ENCODING_WORKERS;
/** Threshold value under which defined workers get preferred when dispatching encoding jobs */
protected static double encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
/** The factory used to generate the entity manager */
protected EntityManagerFactory emf = null;
protected DBSessionFactory dbSessionFactory;
protected DBSession db;
/** The thread pool to use for dispatching queued jobs and checking on phantom services. */
protected ScheduledExecutorService scheduledExecutor = null;
/** The security service */
protected SecurityService securityService = null;
protected IncidentService incidentService = null;
protected Incidents incidents;
/** Whether to collect detailed job statistics */
protected boolean collectJobstats = DEFAULT_JOB_STATISTICS;
/** Maximum age of jobs being considering for service statistics */
protected int maxJobAge = DEFAULT_SERVICE_STATISTICS_MAX_JOB_AGE;
/** A static list of statuses that influence how load balancing is calculated */
protected static final List<Status> JOB_STATUSES_INFLUENCING_LOAD_BALANCING;
private static final Status[] activeJobStatus =
Arrays.stream(Status.values()).filter(Status::isActive).collect(Collectors.toList()).toArray(new Status[0]);
protected static HashMap<Long, Float> jobCache = new HashMap<>();
static {
JOB_STATUSES_INFLUENCING_LOAD_BALANCING = new ArrayList<>();
JOB_STATUSES_INFLUENCING_LOAD_BALANCING.add(Status.RUNNING);
}
/** Whether to accept a job whose load exceeds the host’s max load */
protected Boolean acceptJobLoadsExeedingMaxLoad = true;
// Current system load
protected float localSystemLoad = 0.0f;
/** OSGi DI */
@Reference(target = "(osgi.unit.name=org.opencastproject.common)")
void setEntityManagerFactory(EntityManagerFactory emf) {
this.emf = emf;
}
@Reference
public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
this.dbSessionFactory = dbSessionFactory;
}
@Activate
public void activate(ComponentContext cc) {
logger.info("Activate service registry");
db = dbSessionFactory.createSession(emf);
// Find this host's url
if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty(OpencastConstants.SERVER_URL_PROPERTY))) {
hostName = UrlSupport.DEFAULT_BASE_URL;
} else {
hostName = cc.getBundleContext().getProperty(OpencastConstants.SERVER_URL_PROPERTY);
}
// Check hostname for sanity. It should be the hosts URL with protocol but without any part of the service paths.
if (hostName.endsWith("/")) {
logger.warn("The configured value of {} ends with '/'. This is very likely a configuration error which could "
+ "lead to services not working properly. Note that this configuration should not contain any part of "
+ "the service paths.", OpencastConstants.SERVER_URL_PROPERTY);
}
// Clean all undispatchable jobs that were orphaned when this host was last deactivated
cleanUndispatchableJobs(hostName);
// Register JMX beans with statistics
try {
List<ServiceStatistics> serviceStatistics = getServiceStatistics();
hostsStatistics = new HostsStatistics(serviceStatistics);
servicesStatistics = new ServicesStatistics(hostName, serviceStatistics);
jobsStatistics = new JobsStatistics(hostName);
jmxBeans.add(JmxUtil.registerMXBean(hostsStatistics, JMX_HOSTS_STATISTICS_TYPE));
jmxBeans.add(JmxUtil.registerMXBean(servicesStatistics, JMX_SERVICES_STATISTICS_TYPE));
jmxBeans.add(JmxUtil.registerMXBean(jobsStatistics, JMX_JOBS_STATISTICS_TYPE));
} catch (ServiceRegistryException e) {
logger.error("Error registering JMX statistic beans", e);
}
// Find the jobs URL
if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty("org.opencastproject.jobs.url"))) {
jobHost = hostName;
} else {
jobHost = cc.getBundleContext().getProperty("org.opencastproject.jobs.url");
}
// Register this host
try {
if (cc == null || StringUtils.isBlank(cc.getBundleContext().getProperty(OpencastConstants.NODE_NAME_PROPERTY))) {
nodeName = null;
} else {
nodeName = cc.getBundleContext().getProperty(OpencastConstants.NODE_NAME_PROPERTY);
}
float maxLoad = Runtime.getRuntime().availableProcessors();
if (cc != null && StringUtils.isNotBlank(cc.getBundleContext().getProperty(OPT_MAXLOAD))) {
try {
maxLoad = Float.parseFloat(cc.getBundleContext().getProperty(OPT_MAXLOAD));
logger.info("Max load has been set manually to {}", maxLoad);
} catch (NumberFormatException e) {
logger.warn("Configuration key '{}' is not an integer. Falling back to the number of cores ({})",
OPT_MAXLOAD, maxLoad);
}
}
logger.info("Node maximum load set to {}", maxLoad);
String address = InetAddress.getByName(URI.create(hostName).getHost()).getHostAddress();
long maxMemory = Runtime.getRuntime().maxMemory();
int cores = Runtime.getRuntime().availableProcessors();
registerHost(hostName, address, nodeName, maxMemory, cores, maxLoad);
} catch (Exception e) {
throw new IllegalStateException("Unable to register host " + hostName + " in the service registry", e);
}
// Whether a service accepts a job whose load exceeds the host’s max load
if (cc != null) {
acceptJobLoadsExeedingMaxLoad = getOptContextProperty(cc, ACCEPT_JOB_LOADS_EXCEEDING_PROPERTY).map(Strings.toBool)
.getOrElse(DEFAULT_ACCEPT_JOB_LOADS_EXCEEDING);
}
localSystemLoad = 0;
logger.info("Activated");
}
@Override
public float getOwnLoad() {
return localSystemLoad;
}
@Override
public String getRegistryHostname() {
return hostName;
}
@Deactivate
public void deactivate() {
logger.info("deactivate service registry");
// Wait for job dispatcher to stop before unregistering hosts and requeuing jobs
if (scheduledExecutor != null) {
try {
scheduledExecutor.shutdownNow();
if (!scheduledExecutor.isShutdown()) {
logger.info("Waiting for Dispatcher to terminate");
scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
logger.error("Error shutting down the Dispatcher", e);
}
}
for (ObjectInstance mbean : jmxBeans) {
JmxUtil.unregisterMXBean(mbean);
}
try {
unregisterHost(hostName);
} catch (ServiceRegistryException e) {
throw new IllegalStateException("Unable to unregister host " + hostName + " from the service registry", e);
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String)
*/
@Override
public Job createJob(String type, String operation) throws ServiceRegistryException {
return createJob(this.hostName, type, operation, null, null, true, getCurrentJob(), DEFAULT_JOB_LOAD);
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
* java.util.List)
*/
@Override
public Job createJob(String type, String operation, List<String> arguments) throws ServiceRegistryException {
return createJob(this.hostName, type, operation, arguments, null, true, getCurrentJob(), DEFAULT_JOB_LOAD);
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
* java.util.List, Float)
*/
@Override
public Job createJob(String type, String operation, List<String> arguments, Float jobLoad)
throws ServiceRegistryException {
return createJob(this.hostName, type, operation, arguments, null, true, getCurrentJob(), jobLoad);
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
* java.util.List, String, boolean)
*/
@Override
public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable)
throws ServiceRegistryException {
return createJob(this.hostName, type, operation, arguments, payload, dispatchable, getCurrentJob(),
DEFAULT_JOB_LOAD);
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
* java.util.List, java.lang.String, boolean, Float)
*/
@Override
public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
Float jobLoad) throws ServiceRegistryException {
return createJob(this.hostName, type, operation, arguments, payload, dispatchable, getCurrentJob(), jobLoad);
}
public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
Job parentJob) throws ServiceRegistryException {
return createJob(this.hostName, type, operation, arguments, payload, dispatchable, parentJob, DEFAULT_JOB_LOAD);
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#createJob(java.lang.String, java.lang.String,
* java.util.List, java.lang.String, boolean, org.opencastproject.job.api.Job, Float)
*/
@Override
public Job createJob(String type, String operation, List<String> arguments, String payload, boolean dispatchable,
Job parentJob, Float jobLoad) throws ServiceRegistryException {
return createJob(this.hostName, type, operation, arguments, payload, dispatchable, parentJob, jobLoad);
}
/**
* Creates a job on a remote host with a jobLoad of 1.0.
*/
public Job createJob(String host, String serviceType, String operation, List<String> arguments, String payload,
boolean dispatchable, Job parentJob) throws ServiceRegistryException {
return createJob(host, serviceType, operation, arguments, payload, dispatchable, parentJob, DEFAULT_JOB_LOAD);
}
/**
* Creates a job on a remote host.
*/
public Job createJob(String host, String serviceType, String operation, List<String> arguments, String payload,
boolean dispatchable, Job parentJob, float jobLoad) throws ServiceRegistryException {
if (StringUtils.isBlank(host)) {
throw new IllegalArgumentException("Host can't be null");
}
if (StringUtils.isBlank(serviceType)) {
throw new IllegalArgumentException("Service type can't be null");
}
if (StringUtils.isBlank(operation)) {
throw new IllegalArgumentException("Operation can't be null");
}
JpaJob jpaJob = db.execTxChecked(em -> {
ServiceRegistrationJpaImpl creatingService = getServiceRegistrationQuery(serviceType, host).apply(em)
.orElseThrow(() -> new ServiceRegistryException("No service registration exists for type '" + serviceType
+ "' on host '" + host + "'"));
if (creatingService.getHostRegistration().isMaintenanceMode()) {
logger.warn("Creating a job from {}, which is currently in maintenance mode.", creatingService.getHost());
} else if (!creatingService.getHostRegistration().isActive()) {
logger.warn("Creating a job from {}, which is currently inactive.", creatingService.getHost());
}
User currentUser = securityService.getUser();
Organization currentOrganization = securityService.getOrganization();
JpaJob job = new JpaJob(currentUser, currentOrganization, creatingService, operation, arguments, payload,
dispatchable, jobLoad);
// Bind the given parent job to the new job
if (parentJob != null) {
// Get the JPA instance of the parent job
JpaJob jpaParentJob = getJpaJobQuery(parentJob.getId()).apply(em).orElseThrow(() -> {
logger.error("job with id {} not found in the persistence context", parentJob);
// We don't want to leave the deleted job in the cache if there
removeFromLoadCache(parentJob.getId());
return new ServiceRegistryException(new NotFoundException());
});
job.setParentJob(jpaParentJob);
// Get the JPA instance of the root job
JpaJob jpaRootJob = jpaParentJob;
if (parentJob.getRootJobId() != null) {
jpaRootJob = getJpaJobQuery(parentJob.getRootJobId()).apply(em).orElseThrow(() -> {
logger.error("job with id {} not found in the persistence context", parentJob.getRootJobId());
// We don't want to leave the deleted job in the cache if there
removeFromLoadCache(parentJob.getRootJobId());
return new ServiceRegistryException(new NotFoundException());
});
}
job.setRootJob(jpaRootJob);
}
// if this job is not dispatchable, it must be handled by the host that has created it
if (dispatchable) {
logger.trace("Queuing dispatchable '{}'", job);
job.setStatus(Status.QUEUED);
} else {
logger.trace("Giving new non-dispatchable '{}' its creating service as processor '{}'", job, creatingService);
job.setProcessorServiceRegistration(creatingService);
}
em.persist(job);
return job;
});
setJobUri(jpaJob);
return jpaJob.toJob();
}
@Override
public void removeJobs(List<Long> jobIds) throws NotFoundException, ServiceRegistryException {
for (long jobId: jobIds) {
if (jobId < 1) {
throw new NotFoundException("Job ID must be greater than zero (0)");
}
}
logger.debug("Start deleting jobs with IDs '{}'", jobIds);
try {
db.execTxChecked(em -> {
for (long jobId : jobIds) {
JpaJob job = em.find(JpaJob.class, jobId);
if (job == null) {
logger.error("Job with Id {} cannot be deleted: Not found.", jobId);
removeFromLoadCache(jobId);
throw new NotFoundException("Job with ID '" + jobId + "' not found");
}
deleteChildJobsQuery(jobId).accept(em);
em.remove(job);
removeFromLoadCache(jobId);
}
});
} catch (NotFoundException | ServiceRegistryException e) {
throw e;
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
logger.info("Jobs with IDs '{}' deleted", jobIds);
}
private ThrowingConsumer<EntityManager, Exception> deleteChildJobsQuery(long jobId) {
return em -> {
List<Job> childJobs = getChildJobs(jobId);
if (childJobs.isEmpty()) {
logger.trace("No child jobs of job '{}' found to delete.", jobId);
return;
}
logger.debug("Start deleting child jobs of job '{}'", jobId);
try {
for (int i = childJobs.size() - 1; i >= 0; i--) {
Job job = childJobs.get(i);
JpaJob jobToDelete = em.find(JpaJob.class, job.getId());
em.remove(jobToDelete);
removeFromLoadCache(job.getId());
logger.debug("{} deleted", job);
}
logger.debug("Deleted all child jobs of job '{}'", jobId);
} catch (Exception e) {
throw new ServiceRegistryException("Unable to remove child jobs from " + jobId, e);
}
};
}
@Override
public void removeParentlessJobs(int lifetime) throws ServiceRegistryException {
int count = db.execTxChecked(em -> {
int c = 0;
List<Job> jobs = namedQuery.findAll("Job.withoutParent", JpaJob.class).apply(em).stream()
.map(JpaJob::toJob)
.filter(j -> j.getDateCreated().before(DateUtils.addDays(new Date(), -lifetime)))
// DO NOT DELETE workflow instances and operations!
.filter(j -> !START_OPERATION.equals(j.getOperation())
&& !START_WORKFLOW.equals(j.getOperation())
&& !RESUME.equals(j.getOperation()))
.filter(j -> j.getStatus().isTerminated())
.collect(Collectors.toList());
for (Job job : jobs) {
try {
removeJobs(Collections.singletonList(job.getId()));
logger.debug("Parentless '{}' removed", job);
c++;
} catch (NotFoundException e) {
logger.debug("Parentless '{} ' not found in database", job, e);
}
}
return c;
});
if (count > 0) {
logger.info("Successfully removed {} parentless jobs", count);
} else {
logger.trace("No parentless jobs found to remove");
}
}
/**
* {@inheritDoc}
*
* @see org.osgi.service.cm.ManagedService#updated(java.util.Dictionary)
*/
@Override
public void updated(Dictionary properties) throws ConfigurationException {
logger.info("Updating service registry properties");
maxAttemptsBeforeErrorState = DEFAULT_MAX_ATTEMPTS_BEFORE_ERROR_STATE;
errorStatesEnabled = DEFAULT_ERROR_STATES_ENABLED;
String maxAttempts = StringUtils.trimToNull((String) properties.get(MAX_ATTEMPTS_CONFIG_KEY));
if (maxAttempts != null) {
try {
maxAttemptsBeforeErrorState = Integer.parseInt(maxAttempts);
if (maxAttemptsBeforeErrorState < 0) {
errorStatesEnabled = false;
logger.info("Error states of services disabled");
} else {
errorStatesEnabled = true;
logger.info("Set max attempts before error state to {}", maxAttempts);
}
} catch (NumberFormatException e) {
logger.warn("Can not set max attempts before error state to {}. {} must be an integer", maxAttempts,
MAX_ATTEMPTS_CONFIG_KEY);
}
}
noErrorStateServiceTypes = new ArrayList<>();
String noErrorStateServiceTypesStr = StringUtils.trimToNull((String) properties.get(
NO_ERROR_STATE_SERVICE_TYPES_CONFIG_KEY));
if (noErrorStateServiceTypesStr != null) {
noErrorStateServiceTypes = Arrays.asList(noErrorStateServiceTypesStr.split("\\s*,\\s*"));
if (!noErrorStateServiceTypes.isEmpty()) {
logger.info("Set service types without error state to {}", String.join(", ", noErrorStateServiceTypes));
}
}
long heartbeatInterval = DEFAULT_HEART_BEAT;
String heartbeatIntervalString = StringUtils.trimToNull((String) properties.get(OPT_HEARTBEATINTERVAL));
if (StringUtils.isNotBlank(heartbeatIntervalString)) {
try {
heartbeatInterval = Long.parseLong(heartbeatIntervalString);
} catch (Exception e) {
logger.warn("Heartbeat interval '{}' is malformed, setting to {}", heartbeatIntervalString, DEFAULT_HEART_BEAT);
heartbeatInterval = DEFAULT_HEART_BEAT;
}
if (heartbeatInterval == 0) {
logger.info("Heartbeat disabled");
} else if (heartbeatInterval < 0) {
logger.warn("Heartbeat interval {} seconds too low, adjusting to {}", heartbeatInterval, DEFAULT_HEART_BEAT);
heartbeatInterval = DEFAULT_HEART_BEAT;
} else {
logger.info("Heartbeat interval set to {} seconds", heartbeatInterval);
}
}
String jobStatsString = StringUtils.trimToNull((String) properties.get(OPT_JOBSTATISTICS));
if (StringUtils.isNotBlank(jobStatsString)) {
try {
collectJobstats = Boolean.parseBoolean(jobStatsString);
} catch (Exception e) {
logger.warn("Job statistics collection flag '{}' is malformed, setting to {}", jobStatsString,
DEFAULT_JOB_STATISTICS);
collectJobstats = DEFAULT_JOB_STATISTICS;
}
}
// get the encoding worker nodes defined in the configuration file and parse the comma-separated list
String encodingWorkersString = (String) properties.get(OPT_ENCODING_WORKERS);
if (StringUtils.isNotBlank(encodingWorkersString)) {
encodingWorkers = Arrays.asList(encodingWorkersString.split("\\s*,\\s*"));
} else
encodingWorkers = DEFAULT_ENCODING_WORKERS;
// get the encoding worker load threshold defined in the configuration file and parse the double
String encodingThersholdString = StringUtils.trimToNull((String) properties.get(OPT_ENCODING_THRESHOLD));
if (StringUtils.isNotBlank(encodingThersholdString) && encodingThersholdString != null) {
try {
double encodingThresholdTmp = Double.parseDouble(encodingThersholdString);
if (encodingThresholdTmp >= 0 && encodingThresholdTmp <= 1)
encodingThreshold = encodingThresholdTmp;
else {
encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
logger.warn("org.opencastproject.encoding.workers.threshold is not between 0 and 1");
}
} catch (NumberFormatException e) {
logger.warn("Can not set encoding threshold to {}. {} must be an parsable double", encodingThersholdString,
OPT_ENCODING_THRESHOLD);
}
} else
encodingThreshold = DEFAULT_ENCODING_THRESHOLD;
String maxJobAgeString = StringUtils.trimToNull((String) properties.get(OPT_SERVICE_STATISTICS_MAX_JOB_AGE));
if (maxJobAgeString != null) {
try {
maxJobAge = Integer.parseInt(maxJobAgeString);
logger.info("Set service statistics max job age to {}", maxJobAgeString);
} catch (NumberFormatException e) {
logger.warn("Can not set service statistics max job age to {}. {} must be an integer", maxJobAgeString,
OPT_SERVICE_STATISTICS_MAX_JOB_AGE);
}
}
scheduledExecutor = Executors.newScheduledThreadPool(1);
// Schedule the service heartbeat if the interval is > 0
if (heartbeatInterval > 0) {
logger.debug("Starting service heartbeat at a custom interval of {}s", heartbeatInterval);
scheduledExecutor.scheduleWithFixedDelay(new JobProducerHeartbeat(), heartbeatInterval, heartbeatInterval,
TimeUnit.SECONDS);
}
}
/**
* OSGI callback when the configuration is updated. This method is only here to prevent the
* configuration admin service from calling the service deactivate and activate methods
* for a config update. It does not have to do anything as the updates are handled by updated().
*/
@Modified
public void modified(Map<String, Object> config) throws ConfigurationException {
logger.debug("Modified serviceregistry");
}
private Function<EntityManager, Optional<JpaJob>> getJpaJobQuery(long id) {
return em -> namedQuery.findByIdOpt(JpaJob.class, id)
.apply(em)
.map(jpaJob -> {
// JPA's caches can be out of date if external changes (e.g. another node in the cluster) have been made to
// this row in the database
em.refresh(jpaJob);
setJobUri(jpaJob);
return jpaJob;
});
}
@Override
public Job getJob(long id) throws NotFoundException, ServiceRegistryException {
try {
return db.exec(getJpaJobQuery(id))
.map(JpaJob::toJob)
.orElseThrow(NotFoundException::new);
} catch (NotFoundException e) {
throw e;
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#getCurrentJob()
*/
@Override
public Job getCurrentJob() {
return currentJob.get();
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#setCurrentJob(Job)
*/
@Override
public void setCurrentJob(Job job) {
currentJob.set(job);
}
JpaJob updateJob(JpaJob job) throws ServiceRegistryException {
try {
// tx context is opened in
// updateInternal
// updateServiceForFailover
return db.execChecked(em -> {
Job oldJob = getJob(job.getId());
JpaJob jpaJob = updateInternal(job);
if (!TYPE_WORKFLOW.equals(job.getJobType()) && job.getJobLoad() > 0.0f
&& job.getProcessorServiceRegistration() != null
&& job.getProcessorServiceRegistration().getHost().equals(getRegistryHostname())) {
processCachedLoadChange(job);
}
// All WorkflowService Jobs will be ignored
if (oldJob.getStatus() != job.getStatus() && !TYPE_WORKFLOW.equals(job.getJobType())) {
updateServiceForFailover(job);
}
return jpaJob;
});
} catch (ServiceRegistryException e) {
throw e;
} catch (NotFoundException e) {
// Just in case, remove from cache if there
removeFromLoadCache(job.getId());
throw new ServiceRegistryException(e);
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
@Override
public Job updateJob(Job job) throws ServiceRegistryException {
JpaJob jpaJob = JpaJob.from(job);
jpaJob.setProcessorServiceRegistration(
(ServiceRegistrationJpaImpl) getServiceRegistration(job.getJobType(), job.getProcessingHost()));
return updateJob(jpaJob).toJob();
}
/**
* Processes the job load changes for the *local* load cache
*
* @param job
* The job to apply to the load cache
*/
private synchronized void processCachedLoadChange(JpaJob job) {
if (JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(job.getStatus()) && jobCache.get(job.getId()) == null) {
logger.debug("Adding to load cache: {}, type {}, load {}, status {}",
job, job.getJobType(), job.getJobLoad(), job.getStatus());
localSystemLoad += job.getJobLoad();
jobCache.put(job.getId(), job.getJobLoad());
} else if (jobCache.get(job.getId()) != null && Status.FINISHED.equals(job.getStatus())
|| Status.FAILED.equals(job.getStatus()) || Status.WAITING.equals(job.getStatus())) {
logger.debug("Removing from load cache: {}, type {}, load {}, status {}",
job, job.getJobType(), job.getJobLoad(), job.getStatus());
localSystemLoad -= job.getJobLoad();
jobCache.remove(job.getId());
} else {
logger.debug("Ignoring for load cache: {}, type {}, status {}",
job, job.getJobType(), job.getStatus());
}
logger.debug("Current host load: {}, job load cache size: {}", format("%.1f", localSystemLoad), jobCache.size());
if (jobCache.isEmpty()) {
if (Math.abs(localSystemLoad) > 0.0000001F) {
logger.warn("No jobs in the job load cache, but load is {}: setting job load to 0", localSystemLoad);
}
localSystemLoad = 0.0F;
}
}
private synchronized void removeFromLoadCache(Long jobId) {
if (jobCache.get(jobId) != null) {
float jobLoad = jobCache.get(jobId);
logger.debug("Removing deleted job from load cache: Job {}, load {}", jobId, jobLoad);
localSystemLoad -= jobLoad;
jobCache.remove(jobId);
}
}
protected JpaJob setJobUri(JpaJob job) {
try {
job.setUri(new URI(jobHost + "/services/job/" + job.getId() + ".xml"));
} catch (URISyntaxException e) {
logger.warn("Can not set the job URI", e);
}
return job;
}
/**
* Internal method to update a job, throwing unwrapped JPA exceptions.
*
* @param job
* the job to update
* @return the updated job
*/
protected JpaJob updateInternal(JpaJob job) throws NotFoundException {
JpaJob fromDb = db.execTxChecked(em -> {
JpaJob j = em.find(JpaJob.class, job.getId());
if (j == null) {
throw new NotFoundException();
}
update(j, job);
em.merge(j);
return j;
});
job.setVersion(fromDb.toJob().getVersion());
setJobUri(job);
return job;
}
public void updateStatisticsJobData() {
jobsStatistics.updateAvg(db.exec(getAvgOperationsQuery()));
jobsStatistics.updateJobCount(db.exec(getCountPerHostServiceQuery()));
}
/**
* Internal method to update the service registration state, throwing unwrapped JPA exceptions.
*
* @param registration
* the service registration to update
* @return the updated service registration
*/
private ServiceRegistration updateServiceState(ServiceRegistrationJpaImpl registration) throws NotFoundException {
db.execTxChecked(em -> {
ServiceRegistrationJpaImpl fromDb = em.find(ServiceRegistrationJpaImpl.class, registration.getId());
if (fromDb == null) {
throw new NotFoundException();
}
fromDb.setServiceState(registration.getServiceState());
fromDb.setStateChanged(registration.getStateChanged());
fromDb.setWarningStateTrigger(registration.getWarningStateTrigger());
fromDb.setErrorStateTrigger(registration.getErrorStateTrigger());
});
servicesStatistics.updateService(registration);
return registration;
}
/**
* Sets the queue and runtimes and other elements of a persistent job based on a job that's been modified in memory.
* Times on both the objects must be modified, since the in-memory job must not be stale.
*
* @param fromDb
* The job from the database
* @param jpaJob
* The in-memory job
*/
private void update(JpaJob fromDb, JpaJob jpaJob) {
final Job job = jpaJob.toJob();
final Date now = new Date();
final Status status = job.getStatus();
final Status fromDbStatus = fromDb.getStatus();
fromDb.setPayload(job.getPayload());
fromDb.setStatus(job.getStatus());
fromDb.setDispatchable(job.isDispatchable());
fromDb.setVersion(job.getVersion());
fromDb.setOperation(job.getOperation());
fromDb.setArguments(job.getArguments());
if (job.getDateCreated() == null) {
jpaJob.setDateCreated(now);
fromDb.setDateCreated(now);
job.setDateCreated(now);
}
if (job.getProcessingHost() != null) {
ServiceRegistrationJpaImpl processingService = (ServiceRegistrationJpaImpl) getServiceRegistration(
job.getJobType(), job.getProcessingHost());
logger.debug("{} has host '{}': setting processor service to '{}'", job, job.getProcessingHost(), processingService);
fromDb.setProcessorServiceRegistration(processingService);
} else {
logger.debug("Unsetting previous processor service registration for {}", job);
fromDb.setProcessorServiceRegistration(null);
}
if (Status.RUNNING.equals(status) && !Status.WAITING.equals(fromDbStatus)) {
if (job.getDateStarted() == null) {
jpaJob.setDateStarted(now);
jpaJob.setQueueTime(now.getTime() - job.getDateCreated().getTime());
fromDb.setDateStarted(now);
fromDb.setQueueTime(now.getTime() - job.getDateCreated().getTime());
job.setDateStarted(now);
job.setQueueTime(now.getTime() - job.getDateCreated().getTime());
}
} else if (Status.FAILED.equals(status)) {
// failed jobs may not have even started properly
if (job.getDateCompleted() == null) {
fromDb.setDateCompleted(now);
jpaJob.setDateCompleted(now);
job.setDateCompleted(now);
if (job.getDateStarted() != null) {
jpaJob.setRunTime(now.getTime() - job.getDateStarted().getTime());
fromDb.setRunTime(now.getTime() - job.getDateStarted().getTime());
job.setRunTime(now.getTime() - job.getDateStarted().getTime());
}
}
} else if (Status.FINISHED.equals(status)) {
if (job.getDateStarted() == null) {
// Some services (e.g. ingest) don't use job dispatching, since they start immediately and handle their own
// lifecycle. In these cases, if the start date isn't set, use the date created as the start date
jpaJob.setDateStarted(job.getDateCreated());
job.setDateStarted(job.getDateCreated());
}
if (job.getDateCompleted() == null) {
jpaJob.setDateCompleted(now);
jpaJob.setRunTime(now.getTime() - job.getDateStarted().getTime());
fromDb.setDateCompleted(now);
fromDb.setRunTime(now.getTime() - job.getDateStarted().getTime());
job.setDateCompleted(now);
job.setRunTime(now.getTime() - job.getDateStarted().getTime());
}
}
}
/**
* Fetches a host registration from persistence.
*
* @param host
* the host name
* @return the host registration, or null if none exists
*/
protected Function<EntityManager, Optional<HostRegistrationJpaImpl>> fetchHostRegistrationQuery(String host) {
return namedQuery.findOpt(
"HostRegistration.byHostName",
HostRegistrationJpaImpl.class,
Pair.of("host", host)
);
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#registerHost(String, String, String, long, int, float)
*/
@Override
public void registerHost(String host, String address, String nodeName, long memory, int cores, float maxLoad)
throws ServiceRegistryException {
try {
HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
// Find the existing registrations for this host and if it exists, update it
Optional<HostRegistrationJpaImpl> hostRegistrationOpt = fetchHostRegistrationQuery(host).apply(em);
HostRegistrationJpaImpl hr;
if (hostRegistrationOpt.isEmpty()) {
hr = new HostRegistrationJpaImpl(host, address, nodeName, memory, cores, maxLoad, true, false);
em.persist(hr);
} else {
hr = hostRegistrationOpt.get();
hr.setIpAddress(address);
hr.setNodeName(nodeName);
hr.setMemory(memory);
hr.setCores(cores);
hr.setMaxLoad(maxLoad);
hr.setOnline(true);
em.merge(hr);
}
logger.info("Registering {} with a maximum load of {}", host, maxLoad);
return hr;
});
hostsStatistics.updateHost(hostRegistration);
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#unregisterHost(java.lang.String)
*/
@Override
public void unregisterHost(String host) throws ServiceRegistryException {
try {
HostRegistrationJpaImpl existingHostRegistration = db.execTxChecked(em -> {
HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
() -> new IllegalArgumentException("Host '" + host + "' is not registered, so it can not be unregistered"));
hr.setOnline(false);
for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
unRegisterService(serviceRegistration.getServiceType(), serviceRegistration.getHost());
}
em.merge(hr);
logger.info("Unregistering {}", host);
return hr;
});
logger.info("Host {} unregistered", host);
hostsStatistics.updateHost(existingHostRegistration);
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#enableHost(String)
*/
@Override
public void enableHost(String host) throws ServiceRegistryException, NotFoundException {
try {
HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
// Find the existing registrations for this host and if it exists, update it
HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
() -> new NotFoundException("Host '" + host + "' is currently not registered, so it can not be enabled"));
hr.setActive(true);
em.merge(hr);
logger.info("Enabling {}", host);
return hr;
});
db.execTxChecked(em -> {
for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
((ServiceRegistrationJpaImpl) serviceRegistration).setActive(true);
em.merge(serviceRegistration);
servicesStatistics.updateService(serviceRegistration);
}
});
hostsStatistics.updateHost(hostRegistration);
} catch (NotFoundException e) {
throw e;
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#disableHost(String)
*/
@Override
public void disableHost(String host) throws ServiceRegistryException, NotFoundException {
try {
HostRegistrationJpaImpl hostRegistration = db.execTxChecked(em -> {
HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(host).apply(em).orElseThrow(
() -> new NotFoundException("Host '" + host + "' is not currently registered, so it can not be disabled"));
hr.setActive(false);
for (ServiceRegistration serviceRegistration : getServiceRegistrationsByHost(host)) {
((ServiceRegistrationJpaImpl) serviceRegistration).setActive(false);
em.merge(serviceRegistration);
servicesStatistics.updateService(serviceRegistration);
}
em.merge(hr);
logger.info("Disabling {}", host);
return hr;
});
hostsStatistics.updateHost(hostRegistration);
} catch (NotFoundException e) {
throw e;
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#registerService(java.lang.String, java.lang.String,
* java.lang.String)
*/
@Override
public ServiceRegistration registerService(String serviceType, String baseUrl, String path)
throws ServiceRegistryException {
return registerService(serviceType, baseUrl, path, false);
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#registerService(java.lang.String, java.lang.String,
* java.lang.String, boolean)
*/
@Override
public ServiceRegistration registerService(String serviceType, String baseUrl, String path, boolean jobProducer)
throws ServiceRegistryException {
cleanRunningJobs(serviceType, baseUrl);
return setOnlineStatus(serviceType, baseUrl, path, true, jobProducer);
}
protected Function<EntityManager, Optional<ServiceRegistrationJpaImpl>> getServiceRegistrationQuery(
String serviceType, String host) {
return namedQuery.findOpt(
"ServiceRegistration.getRegistration",
ServiceRegistrationJpaImpl.class,
Pair.of("serviceType", serviceType),
Pair.of("host", host)
);
}
/**
* Sets the online status of a service registration.
*
* @param serviceType
* The job type
* @param baseUrl
* the host URL
* @param online
* whether the service is online or off
* @param jobProducer
* whether this service produces jobs for long running operations
* @return the service registration
*/
protected ServiceRegistration setOnlineStatus(String serviceType, String baseUrl, String path, boolean online,
Boolean jobProducer) throws ServiceRegistryException {
if (isBlank(serviceType) || isBlank(baseUrl)) {
logger.info("Uninformed baseUrl '{}' or service '{}' (path '{}')", baseUrl, serviceType, path);
throw new IllegalArgumentException("serviceType and baseUrl must not be blank");
}
try {
AtomicReference<HostRegistrationJpaImpl> hostRegistration = new AtomicReference<>();
AtomicReference<ServiceRegistrationJpaImpl> registration = new AtomicReference<>();
db.execTxChecked(em -> {
HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(baseUrl).apply(em).orElseThrow(() -> {
logger.info("No associated host registration for '{}' or service '{}' (path '{}')", baseUrl, serviceType,path);
return new IllegalStateException(
"A service registration can not be updated when it has no associated host registration");
});
hostRegistration.set(hr);
ServiceRegistrationJpaImpl sr;
Optional<ServiceRegistrationJpaImpl> srOpt = getServiceRegistrationQuery(serviceType, baseUrl).apply(em);
if (srOpt.isEmpty()) {
if (isBlank(path)) {
// we can not create a new registration without a path
throw new IllegalArgumentException("path must not be blank when registering new services");
}
// if we are not provided a value, consider it to be false
sr = new ServiceRegistrationJpaImpl(hr, serviceType, path, Objects.requireNonNullElse(jobProducer, false));
em.persist(sr);
} else {
sr = srOpt.get();
if (StringUtils.isNotBlank(path)) {
sr.setPath(path);
}
sr.setOnline(online);
if (jobProducer != null) { // if we are not provided a value, don't update the persistent value
sr.setJobProducer(jobProducer);
}
em.merge(sr);
}
registration.set(sr);
});
hostsStatistics.updateHost(hostRegistration.get());
servicesStatistics.updateService(registration.get());
return registration.get();
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#unRegisterService(java.lang.String, java.lang.String)
*/
@Override
public void unRegisterService(String serviceType, String baseUrl) throws ServiceRegistryException {
logger.info("Unregistering Service {}@{} and cleaning its running jobs", serviceType, baseUrl);
// TODO: create methods that accept an entity manager, so we can execute multiple queries using the same em and tx
// (em and tx are reused if using nested db.execTx)
setOnlineStatus(serviceType, baseUrl, null, false, null);
cleanRunningJobs(serviceType, baseUrl);
}
/**
* Find all undispatchable jobs that were orphaned when this host was last deactivated and set them to CANCELLED.
*/
private void cleanUndispatchableJobs(String hostName) {
logger.debug("Starting check for undispatchable jobs for host {}", hostName);
try {
db.execTxChecked(em -> {
List<JpaJob> undispatchableJobs = namedQuery.findAll(
"Job.undispatchable.status",
JpaJob.class,
Pair.of("statuses", List.of(
Status.INSTANTIATED.ordinal(),
Status.RUNNING.ordinal()
))
).apply(em);
if (undispatchableJobs.size() > 0) {
logger.info("Found {} undispatchable jobs on host {}", undispatchableJobs.size(), hostName);
}
for (JpaJob job : undispatchableJobs) {
// Make sure the job was processed on this host
String jobHost = "";
if (job.getProcessorServiceRegistration() != null) {
jobHost = job.getProcessorServiceRegistration().getHost();
}
if (!jobHost.equals(hostName)) {
logger.debug("Will not cancel undispatchable job {} for host {}, it is running on a different host ({})",
job, hostName, jobHost);
continue;
}
logger.info("Cancelling the running undispatchable job {}, it was orphaned on this host ({})", job, hostName);
job.setStatus(Status.CANCELLED);
em.merge(job);
}
});
} catch (Exception e) {
logger.error("Unable to clean undispatchable jobs for host {}! {}", hostName, e.getMessage());
}
}
/**
* Find all running jobs on this service and set them to RESET or CANCELLED.
*
* @param serviceType
* the service type
* @param baseUrl
* the base url
* @throws ServiceRegistryException
* if there is a problem communicating with the jobs database
*/
private void cleanRunningJobs(String serviceType, String baseUrl) throws ServiceRegistryException {
try {
db.execTxChecked(em -> {
TypedQuery<JpaJob> query = em.createNamedQuery("Job.processinghost.status", JpaJob.class)
.setLockMode(LockModeType.PESSIMISTIC_WRITE)
.setParameter("statuses", List.of(
Status.RUNNING.ordinal(),
Status.DISPATCHING.ordinal(),
Status.WAITING.ordinal()
))
.setParameter("host", baseUrl)
.setParameter("serviceType", serviceType);
List<JpaJob> unregisteredJobs = query.getResultList();
if (unregisteredJobs.size() > 0) {
logger.info("Found {} jobs to clean for {}@{}", unregisteredJobs.size(), serviceType, baseUrl);
}
for (JpaJob job : unregisteredJobs) {
if (job.isDispatchable()) {
em.refresh(job);
// If this job has already been treated
if (Status.CANCELLED.equals(job.getStatus()) || Status.RESTART.equals(job.getStatus())) {
continue;
}
if (job.getRootJob() != null && Status.PAUSED.equals(job.getRootJob().getStatus())) {
JpaJob rootJob = job.getRootJob();
cancelAllChildrenQuery(rootJob).accept(em);
rootJob.setStatus(Status.RESTART);
rootJob.setOperation(START_OPERATION);
em.merge(rootJob);
continue;
}
logger.info("Marking child jobs from {} as canceled", job);
cancelAllChildrenQuery(job).accept(em);
logger.info("Rescheduling lost {}", job);
job.setStatus(Status.RESTART);
job.setProcessorServiceRegistration(null);
} else {
logger.info("Marking lost {} as failed", job);
job.setStatus(Status.FAILED);
}
em.merge(job);
}
});
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* Go through all the children recursively to set them in {@link Status#CANCELLED} status
*
* @param job
* the parent job
*/
private Consumer<EntityManager> cancelAllChildrenQuery(JpaJob job) {
return em -> job.getChildJobs().stream()
.peek(em::refresh)
.filter(child -> Status.CANCELLED.equals(child.getStatus()))
.forEach(child -> {
cancelAllChildrenQuery(child).accept(em);
child.setStatus(Status.CANCELLED);
em.merge(child);
});
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#setMaintenanceStatus(java.lang.String, boolean)
*/
@Override
public void setMaintenanceStatus(String baseUrl, boolean maintenance) throws NotFoundException {
logger.info("Setting maintenance mode on host '{}'", baseUrl);
HostRegistrationJpaImpl reg = db.execTxChecked(em -> {
HostRegistrationJpaImpl hr = fetchHostRegistrationQuery(baseUrl).apply(em).orElseThrow(() -> {
logger.warn("Can not set maintenance mode because host '{}' was not registered", baseUrl);
return new NotFoundException("Can not set maintenance mode on a host that has not been registered");
});
hr.setMaintenanceMode(maintenance);
em.merge(hr);
return hr;
});
hostsStatistics.updateHost(reg);
logger.info("Finished setting maintenance mode on host '{}'", baseUrl);
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrations()
*/
@Override
public List<ServiceRegistration> getServiceRegistrations() {
return db.exec(getServiceRegistrationsQuery());
}
@Override
public Incidents incident() {
return incidents;
}
private List<ServiceRegistration> getOnlineServiceRegistrations() {
return db.exec(namedQuery.findAll("ServiceRegistration.getAllOnline", ServiceRegistration.class));
}
/**
* Gets all service registrations.
*
* @return the list of service registrations
*/
protected Function<EntityManager, List<ServiceRegistration>> getServiceRegistrationsQuery() {
return namedQuery.findAll("ServiceRegistration.getAll", ServiceRegistration.class);
}
/**
* Gets all host registrations
*
* @return the list of host registrations
*/
@Override
public List<HostRegistration> getHostRegistrations() {
return db.exec(getHostRegistrationsQuery());
}
@Override
public HostStatistics getHostStatistics() {
HostStatistics statistics = new HostStatistics();
db.exec(namedQuery.findAll(
"HostRegistration.jobStatistics",
Object[].class,
Pair.of("status", List.of(Status.QUEUED.ordinal(), Status.RUNNING.ordinal()))
)).forEach(row -> {
final long host = ((Number) row[0]).longValue();
final int status = ((Number) row[1]).intValue();
final long count = ((Number) row[2]).longValue();
if (status == Status.RUNNING.ordinal()) {
statistics.addRunning(host, count);
} else {
statistics.addQueued(host, count);
}
});
return statistics;
}
/**
* Gets all host registrations
*
* @return the list of host registrations
*/
protected Function<EntityManager, List<HostRegistration>> getHostRegistrationsQuery() {
return namedQuery.findAll("HostRegistration.getAll", HostRegistration.class);
}
@Override
public HostRegistration getHostRegistration(String hostname) throws ServiceRegistryException {
return db.exec(getHostRegistrationQuery(hostname));
}
protected Function<EntityManager, HostRegistration> getHostRegistrationQuery(String hostname) {
return namedQuery.find(
"HostRegistration.byHostName",
HostRegistration.class,
Pair.of("host", hostname)
);
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#getChildJobs(long)
*/
@Override
public List<Job> getChildJobs(long id) throws ServiceRegistryException {
try {
List<JpaJob> jobs = db.exec(namedQuery.findAll(
"Job.root.children",
JpaJob.class,
Pair.of("id", id)
));
if (jobs.size() == 0) {
jobs = db.exec(getChildrenQuery(id));
}
return jobs.stream()
.map(this::setJobUri)
.map(JpaJob::toJob)
.collect(Collectors.toList());
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
private Function<EntityManager, List<JpaJob>> getChildrenQuery(long id) {
return em -> {
TypedQuery<JpaJob> query = em
.createNamedQuery("Job.children", JpaJob.class)
.setParameter("id", id);
List<JpaJob> childJobs = query.getResultList();
List<JpaJob> result = new ArrayList<>(childJobs);
childJobs.stream()
.map(j -> getChildrenQuery(j.getId()).apply(em))
.forEach(result::addAll);
return result;
};
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#getJobs(java.lang.String, Status)
*/
@Override
public List<Job> getJobs(String type, Status status) throws ServiceRegistryException {
logger.trace("Getting jobs '{}' and '{}'", type, status);
Function<EntityManager, List<JpaJob>> jobsQuery;
if (type == null && status == null) {
jobsQuery = namedQuery.findAll("Job.all", JpaJob.class);
} else if (type == null) {
jobsQuery = namedQuery.findAll(
"Job.status",
JpaJob.class,
Pair.of("status", status.ordinal())
);
} else if (status == null) {
jobsQuery = namedQuery.findAll(
"Job.type",
JpaJob.class,
Pair.of("serviceType", type)
);
} else {
jobsQuery = namedQuery.findAll(
"Job",
JpaJob.class,
Pair.of("status", status.ordinal()),
Pair.of("serviceType", type)
);
}
try {
return db.exec(jobsQuery).stream()
.peek(this::setJobUri)
.map(JpaJob::toJob)
.collect(Collectors.toList());
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
@Override
public List<String> getJobPayloads(String operation) throws ServiceRegistryException {
try {
return db.exec(namedQuery.findAll(
"Job.payload",
String.class,
Pair.of("operation", operation)
));
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
@Override
public List<String> getJobPayloads(String operation, int limit, int offset) throws ServiceRegistryException {
try {
return db.exec(em -> {
return em.createNamedQuery("Job.payload", String.class)
.setParameter("operation", operation)
.setMaxResults(limit)
.setFirstResult(offset)
.getResultList();
});
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
@Override
public int getJobCount(final String operation) throws ServiceRegistryException {
try {
return db.exec(namedQuery.find(
"Job.countByOperationOnly",
Number.class,
Pair.of("operation", operation)
)).intValue();
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#getActiveJobs()
*/
@Override
public List<Job> getActiveJobs() throws ServiceRegistryException {
try {
return db.exec(getJobsByStatusQuery(activeJobStatus)).stream()
.map(JpaJob::toJob)
.collect(Collectors.toList());
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* Get the list of jobs with status from the given statuses.
*
* @param statuses
* variable sized array of status values to test on jobs
* @return list of jobs with status from statuses
*/
public Function<EntityManager, List<JpaJob>> getJobsByStatusQuery(Status... statuses) {
if (statuses == null || statuses.length < 1) {
throw new IllegalArgumentException("At least one job status must be given.");
}
return namedQuery.findAll(
"Job.statuses",
JpaJob.class,
Pair.of("statuses", Arrays.stream(statuses).map(Enum::ordinal).collect(Collectors.toList()))
).andThen(jobs -> jobs.stream()
.peek(this::setJobUri)
.collect(Collectors.toList()));
}
@Override
public Map<String, Map<String, Long>> countActiveByOrganizationAndHost() {
var rows = db.exec(namedQuery.findAll(
"Job.countByOrganizationAndHost",
Object[].class,
Pair.of("statuses", Arrays.stream(activeJobStatus).map(Enum::ordinal).collect(Collectors.toList()))
)).stream().collect(Collectors.toList());
var orgMap = new HashMap<String, Map<String, Long>>();
for (Object[] row: rows) {
var org = (String) row[0];
var host = (String) row[1];
var count = (Long) row[2];
orgMap.computeIfAbsent(org, o -> new HashMap<>()).put(host, count);
}
return orgMap;
}
@Override
public Map<String, Long> countActiveTypeByOrganization(final String operation) {
return db.exec(namedQuery.findAll(
"Job.countTypeByOrganization",
Object[].class,
Pair.of("statuses", Arrays.stream(activeJobStatus).map(Enum::ordinal).collect(Collectors.toList())),
Pair.of("operation", operation)
)).stream().collect(Collectors.toMap(
row -> (String) row[0],
row -> (Long) row[1]
));
}
/**
* Gets jobs of all types that are in the given state.
*
* @param offset apply offset to the db query if offset > 0
* @param limit apply limit to the db query if limit > 0
* @param statuses the job status should be one from the given statuses
* @return the list of jobs waiting for dispatch
*/
protected Function<EntityManager, List<JpaJob>> getDispatchableJobsWithStatusQuery(int offset, int limit,
Status... statuses) {
return em -> {
if (statuses == null) {
return Collections.emptyList();
}
TypedQuery<JpaJob> query = em
.createNamedQuery("Job.dispatchable.status", JpaJob.class)
.setParameter("statuses", Arrays.stream(statuses).map(Enum::ordinal).collect(Collectors.toList()));
if (offset > 0) {
query.setFirstResult(offset);
}
if (limit > 0) {
query.setMaxResults(limit);
}
return query.getResultList();
};
}
Function<EntityManager, List<Object[]>> getAvgOperationsQuery() {
return namedQuery.findAll("Job.avgOperation", Object[].class);
}
Function<EntityManager, List<Object[]>> getCountPerHostServiceQuery() {
return namedQuery.findAll("Job.countPerHostService", Object[].class);
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#count(java.lang.String, Status)
*/
@Override
public long count(String serviceType, Status status) throws ServiceRegistryException {
Function<EntityManager, Number> countQuery;
if (serviceType == null && status == null) {
countQuery = namedQuery.find(
"Job.count.all",
Number.class
);
} else if (serviceType == null) {
countQuery = namedQuery.find(
"Job.count.nullType",
Number.class,
Pair.of("status", status.ordinal())
);
} else if (status == null) {
countQuery = namedQuery.find(
"Job.count.nullStatus",
Number.class,
Pair.of("serviceType", serviceType)
);
} else {
countQuery = namedQuery.find(
"Job.count",
Number.class,
Pair.of("status", status.ordinal()),
Pair.of("serviceType", serviceType)
);
}
try {
return db.exec(countQuery).longValue();
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#countByHost(java.lang.String, java.lang.String,
* Status)
*/
@Override
public long countByHost(String serviceType, String host, Status status) throws ServiceRegistryException {
Function<EntityManager, Number> countQuery;
if (serviceType != null && !serviceType.isEmpty()) {
countQuery = namedQuery.find(
"Job.countByHost",
Number.class,
Pair.of("serviceType", serviceType),
Pair.of("status", status.ordinal()),
Pair.of("host", host)
);
} else {
countQuery = namedQuery.find(
"Job.countByHost.nullType",
Number.class,
Pair.of("status", status.ordinal()),
Pair.of("host", host)
);
}
try {
return db.exec(countQuery).longValue();
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#countByOperation(java.lang.String, java.lang.String,
* Status)
*/
@Override
public long countByOperation(String serviceType, String operation, Status status) throws ServiceRegistryException {
try {
return db.exec(namedQuery.find(
"Job.countByOperation",
Number.class,
Pair.of("status", status.ordinal()),
Pair.of("serviceType", serviceType),
Pair.of("operation", operation)
)).longValue();
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#count(java.lang.String, java.lang.String,
* java.lang.String, Status)
*/
@Override
public long count(String serviceType, String host, String operation, Status status) throws ServiceRegistryException {
if (StringUtils.isBlank(serviceType) || StringUtils.isBlank(host) || StringUtils.isBlank(operation)
|| status == null) {
throw new IllegalArgumentException("service type, host, operation, and status must be provided");
}
try {
return db.exec(namedQuery.find(
"Job.fullMonty",
Number.class,
Pair.of("status", status.ordinal()),
Pair.of("serviceType", serviceType),
Pair.of("operation", operation)
)).longValue();
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceStatistics()
*/
@Override
public List<ServiceStatistics> getServiceStatistics() throws ServiceRegistryException {
Date now = new Date();
try {
return db.exec(getServiceStatisticsQuery(
DateUtils.addDays(now, -maxJobAge),
DateUtils.addDays(now, 1) // Avoid glitches around 'now' by setting the endDate to 'tomorrow'
));
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* Gets performance and runtime statistics for each known service registration.
* For the statistics, only jobs created within the time interval [startDate, endDate] are being considered
*
* @param startDate
* Only jobs created after this data are considered for statistics
* @param endDate
* Only jobs created before this data are considered for statistics
* @return the service statistics
*/
private Function<EntityManager, List<ServiceStatistics>> getServiceStatisticsQuery(Date startDate, Date endDate) {
return em -> {
Map<Long, JaxbServiceStatistics> statsMap = new HashMap<>();
// Make sure we also include the services that have no processing history so far
namedQuery.findAll("ServiceRegistration.getAll", ServiceRegistrationJpaImpl.class).apply(em).forEach(s ->
statsMap.put(s.getId(), new JaxbServiceStatistics(s))
);
if (collectJobstats) {
// Build stats map
namedQuery.findAll(
"ServiceRegistration.statistics",
Object[].class,
Pair.of("minDateCreated", startDate),
Pair.of("maxDateCreated", endDate)
).apply(em).forEach(row -> {
Number serviceRegistrationId = (Number) row[0];
if (serviceRegistrationId == null || serviceRegistrationId.longValue() == 0) {
return;
}
Status status = Status.values()[((Number) row[1]).intValue()];
Number count = (Number) row[2];
Number meanQueueTime = (Number) row[3];
Number meanRunTime = (Number) row[4];
// The statistics query returns a cartesian product, so we need to iterate over them to build up the objects
JaxbServiceStatistics stats = statsMap.get(serviceRegistrationId.longValue());
if (stats == null) {
return;
}
// the status will be null if there are no jobs at all associated with this service registration
if (status != null) {
switch (status) {
case RUNNING:
stats.setRunningJobs(count.intValue());
break;
case QUEUED:
case DISPATCHING:
stats.setQueuedJobs(count.intValue());
break;
case FINISHED:
stats.setMeanRunTime(meanRunTime.longValue());
stats.setMeanQueueTime(meanQueueTime.longValue());
stats.setFinishedJobs(count.intValue());
break;
default:
break;
}
}
});
}
List<ServiceStatistics> stats = new ArrayList<>(statsMap.values());
stats.sort((o1, o2) -> {
ServiceRegistration reg1 = o1.getServiceRegistration();
ServiceRegistration reg2 = o2.getServiceRegistration();
int typeComparison = reg1.getServiceType().compareTo(reg2.getServiceType());
return typeComparison == 0
? reg1.getHost().compareTo(reg2.getHost())
: typeComparison;
});
return stats;
};
}
/**
* Do not look at this, it will burn your eyes! This is due to JPA's inability to do a left outer join with join
* conditions.
*
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrationsByLoad(java.lang.String)
*/
@Override
public List<ServiceRegistration> getServiceRegistrationsByLoad(String serviceType) throws ServiceRegistryException {
SystemLoad loadByHost = getCurrentHostLoads();
List<HostRegistration> hostRegistrations = getHostRegistrations();
List<ServiceRegistration> serviceRegistrations = getServiceRegistrationsByType(serviceType);
return getServiceRegistrationsByLoad(serviceType, serviceRegistrations, hostRegistrations, loadByHost);
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#getCurrentHostLoads()
*/
@Override
public SystemLoad getCurrentHostLoads() {
return db.exec(getHostLoadsQuery());
}
/**
* Gets a map of hosts to the number of jobs currently loading that host
*
* @return the map of hosts to job counts
*/
Function<EntityManager, SystemLoad> getHostLoadsQuery() {
return em -> {
final SystemLoad systemLoad = new SystemLoad();
// Find all jobs that are currently running on any given host, or get all of them
List<Integer> statuses = JOB_STATUSES_INFLUENCING_LOAD_BALANCING.stream()
.map(Enum::ordinal)
.collect(Collectors.toList());
List<Object[]> rows = namedQuery.findAll(
"ServiceRegistration.hostloads",
Object[].class,
Pair.of("statuses", statuses),
// Note: This is used in the query to filter out workflow jobs.
// These jobs are load balanced by the workflow service directly.
Pair.of("workflow_type", TYPE_WORKFLOW)
).apply(em);
// Accumulate the numbers for relevant job statuses per host
for (Object[] row : rows) {
String host = String.valueOf(row[0]);
Status status = Status.values()[(int) row[1]];
float currentLoad = ((Number) row[2]).floatValue();
float maxLoad = ((Number) row[3]).floatValue();
// Only queued, and running jobs are adding to the load, so every other status is discarded
if (status == null || !JOB_STATUSES_INFLUENCING_LOAD_BALANCING.contains(status)) {
currentLoad = 0.0f;
}
// Add the service registration
NodeLoad serviceLoad = new NodeLoad(host, currentLoad, maxLoad);
systemLoad.addNodeLoad(serviceLoad);
}
// This is important, otherwise services which have no current load are not listed in the output!
getHostRegistrationsQuery().apply(em).stream()
.filter(h -> !systemLoad.containsHost(h.getBaseUrl()))
.forEach(h -> systemLoad.addNodeLoad(new NodeLoad(h.getBaseUrl(), 0.0f, h.getMaxLoad())));
return systemLoad;
};
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrationsByType(java.lang.String)
*/
@Override
public List<ServiceRegistration> getServiceRegistrationsByType(String serviceType) throws ServiceRegistryException {
return db.exec(namedQuery.findAll(
"ServiceRegistration.getByType",
ServiceRegistration.class,
Pair.of("serviceType", serviceType)
));
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistrationsByHost(java.lang.String)
*/
@Override
public List<ServiceRegistration> getServiceRegistrationsByHost(String host) throws ServiceRegistryException {
return db.exec(getServiceRegistrationsByHostQuery(host));
}
private Function<EntityManager, List<ServiceRegistration>> getServiceRegistrationsByHostQuery(String host) {
return namedQuery.findAll(
"ServiceRegistration.getByHost",
ServiceRegistration.class,
Pair.of("host", host)
);
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#getServiceRegistration(java.lang.String,
* java.lang.String)
*/
@Override
public ServiceRegistration getServiceRegistration(String serviceType, String host) {
return db.exec(getServiceRegistrationQuery(serviceType, host))
.orElse(null);
}
/**
* Sets the trusted http client.
*
* @param client
* the trusted http client
*/
@Reference
void setTrustedHttpClient(TrustedHttpClient client) {
this.client = client;
}
/**
* Callback for setting the security service.
*
* @param securityService
* the securityService to set
*/
@Reference
public void setSecurityService(SecurityService securityService) {
this.securityService = securityService;
}
/** OSGi DI. */
@Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC, unbind = "unsetIncidentService")
public void setIncidentService(IncidentService incidentService) {
this.incidentService = incidentService;
// Manually resolve the cyclic dependency between the incident service and the service registry
((OsgiIncidentService) incidentService).setServiceRegistry(this);
this.incidents = new Incidents(this, incidentService);
}
public void unsetIncidentService(IncidentService incidentService) {
if (this.incidentService == incidentService) {
this.incidentService = null;
this.incidents = null;
}
}
/**
* Update the jobs failure history and the service status with the given information. All these data are then use for
* the jobs failover strategy. Only the terminated job (with FAILED or FINISHED status) are taken into account.
*
* @param job
* the current job that failed/succeeded
* @throws ServiceRegistryException
* @throws NotFoundException
*/
private void updateServiceForFailover(JpaJob job) throws ServiceRegistryException, NotFoundException {
if (job.getStatus() != Status.FAILED && job.getStatus() != Status.FINISHED) {
return;
}
job.setStatus(job.getStatus(), job.getFailureReason());
// At this point, the only possible states for the current service are NORMAL and WARNING,
// the services in ERROR state will not be chosen by the dispatcher
ServiceRegistrationJpaImpl currentService = job.getProcessorServiceRegistration();
if (currentService == null) {
return;
}
// Job is finished with a failure
if (job.getStatus() == FAILED && !DATA.equals(job.getFailureReason())) {
// Services in WARNING or ERROR state triggered by current job
List<ServiceRegistrationJpaImpl> relatedWarningOrErrorServices = getRelatedWarningErrorServices(job);
// Before this job failed there was at least one job failed with this job signature on any service
if (relatedWarningOrErrorServices.size() > 0) {
for (ServiceRegistrationJpaImpl relatedService : relatedWarningOrErrorServices) {
// Skip current service from the list
if (currentService.equals(relatedService)) {
continue;
}
// De-escalate the state of related services as the issue is most likely with the job not the service
// Reset the WARNING job to NORMAL
if (relatedService.getServiceState() == WARNING) {
logger.info("State reset to NORMAL for related service {} on host {}", relatedService.getServiceType(),
relatedService.getHost());
relatedService.setServiceState(NORMAL, job.toJob().getSignature());
}
// Reset the ERROR job to WARNING
else if (relatedService.getServiceState() == ERROR) {
logger.info("State reset to WARNING for related service {} on host {}", relatedService.getServiceType(),
relatedService.getHost());
relatedService.setServiceState(WARNING, relatedService.getWarningStateTrigger());
}
updateServiceState(relatedService);
}
}
// This is the first job with this signature failing on any service
else {
// Set the current service to WARNING state
if (currentService.getServiceState() == NORMAL) {
logger.info("State set to WARNING for current service {} on host {}", currentService.getServiceType(),
currentService.getHost());
currentService.setServiceState(WARNING, job.toJob().getSignature());
updateServiceState(currentService);
}
// The current service already is in WARNING state and max attempts is reached
else if (errorStatesEnabled && !noErrorStateServiceTypes.contains(currentService.getServiceType())
&& getHistorySize(currentService) >= maxAttemptsBeforeErrorState) {
logger.info("State set to ERROR for current service {} on host {}", currentService.getServiceType(),
currentService.getHost());
currentService.setServiceState(ERROR, job.toJob().getSignature());
updateServiceState(currentService);
}
}
}
// Job is finished without failure
else if (job.getStatus() == Status.FINISHED) {
// If the service was in warning state reset to normal state
if (currentService.getServiceState() == WARNING) {
logger.info("State reset to NORMAL for current service {} on host {}", currentService.getServiceType(),
currentService.getHost());
currentService.setServiceState(NORMAL);
updateServiceState(currentService);
}
}
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#sanitize(java.lang.String, java.lang.String)
*/
@Override
public void sanitize(String serviceType, String host) throws NotFoundException {
db.execChecked(em -> {
ServiceRegistrationJpaImpl service = getServiceRegistrationQuery(serviceType, host).apply(em)
.orElseThrow(NotFoundException::new);
logger.info("State reset to NORMAL for service {} on host {} through sanitize method", service.getServiceType(),
service.getHost());
service.setServiceState(NORMAL);
updateServiceState(service);
});
}
/**
* Gets the failed jobs history for the given service registration
*
* @param serviceRegistration
* @return the failed jobs history size
* @throws IllegalArgumentException
* if parameter is null
* @throws ServiceRegistryException
*/
private int getHistorySize(ServiceRegistration serviceRegistration) throws ServiceRegistryException {
if (serviceRegistration == null) {
throw new IllegalArgumentException("serviceRegistration must not be null!");
}
logger.debug("Calculating count of jobs who failed due to service {}", serviceRegistration);
try {
return db.exec(namedQuery.find(
"Job.count.history.failed",
Number.class,
Pair.of("serviceType", serviceRegistration.getServiceType()),
Pair.of("host", serviceRegistration.getHost())
)).intValue();
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* Gets the services in WARNING or ERROR state triggered by this job
*
* @param job
* the given job to get the related services
* @return a list of services triggered by the job
* @throws IllegalArgumentException
* if the given job was null
* @throws ServiceRegistryException
* if the there was a problem with the query
*/
private List<ServiceRegistrationJpaImpl> getRelatedWarningErrorServices(JpaJob job) throws ServiceRegistryException {
if (job == null) {
throw new IllegalArgumentException("job must not be null!");
}
logger.debug("Try to get the services in WARNING or ERROR state triggered by {} failed", job);
try {
return db.exec(namedQuery.findAll(
"ServiceRegistration.relatedservices.warning_error",
ServiceRegistrationJpaImpl.class,
Pair.of("serviceType", job.getJobType())
)).stream()
// TODO: modify the query to avoid to go through the list here
.filter(rs ->
(rs.getServiceState() == WARNING && rs.getWarningStateTrigger() == job.toJob().getSignature())
|| (rs.getServiceState() == ERROR && rs.getErrorStateTrigger() == job.toJob().getSignature())
).collect(Collectors.toList());
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/**
* Returns a filtered list of service registrations, containing only those that are online, not in maintenance mode,
* and with a specific service type that are running on a host which is not already maxed out.
*
* @param serviceRegistrations
* the complete list of service registrations
* @param hostRegistrations
* the complete list of available host registrations
* @param systemLoad
* the map of hosts to the number of running jobs
* @param jobType
* the job type for which the services registrations are filtered
*/
protected List<ServiceRegistration> getServiceRegistrationsWithCapacity(String jobType,
List<ServiceRegistration> serviceRegistrations, List<HostRegistration> hostRegistrations,
final SystemLoad systemLoad) {
final List<String> hostBaseUrls = hostRegistrations.stream()
.map(HostRegistration::getBaseUrl)
.collect(Collectors.toUnmodifiableList());
final List<ServiceRegistration> filteredList = new ArrayList<>();
for (ServiceRegistration service : serviceRegistrations) {
// Skip service if host not available
if (!hostBaseUrls.contains(service.getHost())) {
logger.trace("Not considering {} because it's host {} is not available for dispatching", service,
service.getHost());
continue;
}
// Skip services that are not of the requested type
if (!jobType.equals(service.getServiceType())) {
logger.trace("Not considering {} because it is of the wrong job type", service);
continue;
}
// Skip services that are in error state
if (service.getServiceState() == ERROR) {
logger.trace("Not considering {} because it is in error state", service);
continue;
}
// Skip services that are in maintenance mode
if (service.isInMaintenanceMode()) {
logger.trace("Not considering {} because it is in maintenance mode", service);
continue;
}
// Skip services that are marked as offline
if (!service.isOnline()) {
logger.trace("Not considering {} because it is currently offline", service);
continue;
}
// Determine the maximum load for this host
Float hostLoadMax = null;
for (HostRegistration host : hostRegistrations) {
if (host.getBaseUrl().equals(service.getHost())) {
hostLoadMax = host.getMaxLoad();
break;
}
}
if (hostLoadMax == null) {
logger.warn("Unable to determine max load for host {}", service.getHost());
}
// Determine the current load for this host
Float hostLoad = systemLoad.get(service.getHost()).getLoadFactor();
if (hostLoad == null) {
logger.warn("Unable to determine current load for host {}", service.getHost());
}
// Is this host suited for processing?
if (hostLoad == null || hostLoadMax == null || hostLoad < hostLoadMax) {
logger.debug("Adding candidate service {} for processing of jobs of type '{}' (host load is {} of max {})",
service, jobType, hostLoad, hostLoadMax);
filteredList.add(service);
}
}
// Sort the list by capacity
filteredList.sort(new LoadComparator(systemLoad));
return filteredList;
}
/**
* Returns a filtered list of service registrations, containing only those that are online, not in maintenance mode,
* and with a specific service type, ordered by load.
*
* @param jobType
* the job type for which the services registrations are filtered
* @param serviceRegistrations
* the complete list of service registrations
* @param hostRegistrations
* the complete list of available host registrations
* @param systemLoad
*
*/
protected List<ServiceRegistration> getServiceRegistrationsByLoad(String jobType,
List<ServiceRegistration> serviceRegistrations, List<HostRegistration> hostRegistrations,
final SystemLoad systemLoad) {
final List<String> hostBaseUrls = hostRegistrations.stream()
.map(HostRegistration::getBaseUrl)
.collect(Collectors.toUnmodifiableList());
final List<ServiceRegistration> filteredList = new ArrayList<>();
logger.debug("Finding services to dispatch job of type {}", jobType);
for (ServiceRegistration service : serviceRegistrations) {
// Skip service if host not available
if (!hostBaseUrls.contains(service.getHost())) {
logger.trace("Not considering {} because it's host {} is not available for dispatching", service,
service.getHost());
continue;
}
// Skip services that are not of the requested type
if (!jobType.equals(service.getServiceType())) {
logger.trace("Not considering {} because it is of the wrong job type", service);
continue;
}
// Skip services that are in error state
if (service.getServiceState() == ERROR) {
logger.trace("Not considering {} because it is in error state", service);
continue;
}
// Skip services that are in maintenance mode
if (service.isInMaintenanceMode()) {
logger.trace("Not considering {} because it is in maintenance mode", service);
continue;
}
// Skip services that are marked as offline
if (!service.isOnline()) {
logger.trace("Not considering {} because it is currently offline", service);
continue;
}
// We found a candidate service
logger.debug("Adding candidate service {} for processing of job of type '{}'", service, jobType);
filteredList.add(service);
}
// Sort the list by capacity and distinguish between composer jobs and other jobs
if ("org.opencastproject.composer".equals(jobType))
Collections.sort(filteredList, new LoadComparatorEncoding(systemLoad));
else
Collections.sort(filteredList, new LoadComparator(systemLoad));
return filteredList;
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#getMaxLoads()
*/
@Override
public SystemLoad getMaxLoads() throws ServiceRegistryException {
final SystemLoad loads = new SystemLoad();
getHostRegistrations().stream()
.map(host -> new NodeLoad(host.getBaseUrl(), 0.0f, host.getMaxLoad()))
.forEach(loads::addNodeLoad);
return loads;
}
/**
* {@inheritDoc}
*
* @see org.opencastproject.serviceregistry.api.ServiceRegistry#getMaxLoadOnNode(java.lang.String)
*/
@Override
public NodeLoad getMaxLoadOnNode(String host) throws ServiceRegistryException, NotFoundException {
try {
float maxLoad = db.exec(namedQuery.find(
"HostRegistration.getMaxLoadByHostName",
Number.class,
Pair.of("host", host)
)).floatValue();
return new NodeLoad(host, 0.0f, maxLoad);
} catch (NoResultException e) {
throw new NotFoundException(e);
} catch (Exception e) {
throw new ServiceRegistryException(e);
}
}
/** A periodic check on each service registration to ensure that it is still alive. */
class JobProducerHeartbeat implements Runnable {
/** List of service registrations that have been found unresponsive last time we checked */
private final List<ServiceRegistration> unresponsive = new ArrayList<>();
/**
* {@inheritDoc}
*
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
logger.debug("Checking for unresponsive services");
try {
List<ServiceRegistration> serviceRegistrations = getOnlineServiceRegistrations();
for (ServiceRegistration service : serviceRegistrations) {
hostsStatistics.updateHost(((ServiceRegistrationJpaImpl) service).getHostRegistration());
servicesStatistics.updateService(service);
if (!service.isJobProducer()) {
continue;
}
if (service.isInMaintenanceMode()) {
continue;
}
// We think this service is online and available. Prove it.
String serviceUrl = UrlSupport.concat(service.getHost(), service.getPath(), "dispatch");
HttpHead options = new HttpHead(serviceUrl);
HttpResponse response = null;
try {
try {
response = client.execute(options);
if (response != null) {
switch (response.getStatusLine().getStatusCode()) {
case HttpStatus.SC_OK:
// this service is reachable, continue checking other services
logger.trace("Service " + service + " is responsive: " + response.getStatusLine());
if (unresponsive.remove(service)) {
logger.info("Service {} is still online", service);
} else if (!service.isOnline()) {
try {
setOnlineStatus(service.getServiceType(), service.getHost(), service.getPath(), true, true);
logger.info("Service {} is back online", service);
} catch (ServiceRegistryException e) {
logger.warn("Error setting online status for {}", service);
}
}
continue;
default:
if (!service.isOnline()) {
continue;
}
logger.warn("Service {} is not working as expected: {}", service, response.getStatusLine());
}
} else {
logger.warn("Service {} does not respond", service);
}
} catch (TrustedHttpClientException e) {
if (!service.isOnline()) {
continue;
}
logger.warn("Unable to reach {}", service, e);
}
// If we get here, the service did not respond as expected
try {
if (unresponsive.contains(service)) {
unRegisterService(service.getServiceType(), service.getHost());
unresponsive.remove(service);
logger.warn("Marking {} as offline", service);
} else {
unresponsive.add(service);
logger.warn("Added {} to the watch list", service);
}
} catch (ServiceRegistryException e) {
logger.warn("Unable to unregister unreachable service: {}", service, e);
}
} finally {
client.close(response);
}
}
} catch (Throwable t) {
logger.warn("Error while checking for unresponsive services", t);
}
logger.debug("Finished checking for unresponsive services");
}
}
/**
* Comparator that will sort service registrations depending on their capacity, wich is defined by the number of jobs
* the service's host is already running divided by the MaxLoad of the Server. The lower that number, the bigger the capacity.
*/
private class LoadComparator implements Comparator<ServiceRegistration> {
protected SystemLoad loadByHost = null;
/**
* Creates a new comparator which is using the given map of host names and loads.
*
* @param loadByHost
* the current work load by host
*/
LoadComparator(SystemLoad loadByHost) {
this.loadByHost = loadByHost;
}
@Override
public int compare(ServiceRegistration serviceA, ServiceRegistration serviceB) {
String hostA = serviceA.getHost();
String hostB = serviceB.getHost();
NodeLoad nodeA = loadByHost.get(hostA);
NodeLoad nodeB = loadByHost.get(hostB);
// If the load factors are about the same, sort based on maximum load
if (Math.abs(nodeA.getLoadFactor() - nodeB.getLoadFactor()) <= 0.01) {
// NOTE: The sort order below is *reversed* from what you'd expect
// When we're comparing the load factors we want the node with the lowest factor to be first
// When we're comparing the maximum load value, we want the node with the highest max to be first
return Float.compare(nodeB.getMaxLoad(), nodeA.getMaxLoad());
}
return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
}
}
/**
* Comparator that will sort service registrations depending on their capacity, which is defined by the number of jobs
* the service's host is already running divided by the MaxLoad of the Server. The lower that number, the bigger the capacity.
* This Comparator will preferre encoding workers, if none are defined in the configuration file it will act like the LoadComparator.
*/
private class LoadComparatorEncoding extends LoadComparator implements Comparator<ServiceRegistration> {
/**
* Creates a new comparator which is using the given map of host names and loads.
*
* @param loadByHost
*/
LoadComparatorEncoding(SystemLoad loadByHost) {
super(loadByHost);
}
@Override
public int compare(ServiceRegistration serviceA, ServiceRegistration serviceB) {
String hostA = serviceA.getHost();
String hostB = serviceB.getHost();
NodeLoad nodeA = loadByHost.get(hostA);
NodeLoad nodeB = loadByHost.get(hostB);
if (encodingWorkers != null) {
if (encodingWorkers.contains(hostA) && !encodingWorkers.contains(hostB)) {
if (nodeA.getLoadFactor() <= encodingThreshold) {
return -1;
}
return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
}
if (encodingWorkers.contains(hostB) && !encodingWorkers.contains(hostA)) {
if (nodeB.getLoadFactor() <= encodingThreshold) {
return 1;
}
return Float.compare(nodeA.getLoadFactor(), nodeB.getLoadFactor());
}
}
return super.compare(serviceA, serviceB);
}
}
}