SchedulerServiceDatabaseImpl.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.scheduler.impl.persistence;
import static org.opencastproject.db.Queries.namedQuery;
import org.opencastproject.db.DBSession;
import org.opencastproject.db.DBSessionFactory;
import org.opencastproject.scheduler.impl.SchedulerServiceDatabase;
import org.opencastproject.scheduler.impl.SchedulerServiceDatabaseException;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.util.NotFoundException;
import com.google.gson.Gson;
import org.apache.commons.lang3.tuple.Pair;
import org.joda.time.DateTime;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.TypedQuery;
/**
* Implements {@link SchedulerServiceDatabase}.
*/
@Component(
immediate = true,
service = SchedulerServiceDatabase.class
)
public class SchedulerServiceDatabaseImpl implements SchedulerServiceDatabase {
/** JPA persistence unit name */
public static final String PERSISTENCE_UNIT = "org.opencastproject.scheduler.impl.persistence";
/** Logging utilities */
private static final Logger logger = LoggerFactory.getLogger(SchedulerServiceDatabaseImpl.class);
/** Factory used to create {@link EntityManager}s for transactions */
private EntityManagerFactory emf;
private DBSessionFactory dbSessionFactory;
private DBSession db;
/** The security service */
private SecurityService securityService;
private static final Gson gson = new Gson();
/** OSGi DI */
@Reference(target = "(osgi.unit.name=org.opencastproject.scheduler.impl.persistence)")
public void setEntityManagerFactory(EntityManagerFactory emf) {
this.emf = emf;
}
/** OSGi DI */
@Reference
public void setSecurityService(SecurityService securityService) {
this.securityService = securityService;
}
@Reference
public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
this.dbSessionFactory = dbSessionFactory;
}
/**
* Creates {@link EntityManagerFactory} using persistence provider and properties passed via OSGi.
*
* @param cc
*/
@Activate
public void activate(ComponentContext cc) {
logger.info("Activating persistence manager for scheduler");
db = dbSessionFactory.createSession(emf);
}
/*
* We need to synchronize this method because JPA doesn't support thread-safe atomic upserts.
*/
@Override
public synchronized void touchLastEntry(String agentId) throws SchedulerServiceDatabaseException {
try {
db.execTx(em -> {
LastModifiedDto entity = em.find(LastModifiedDto.class, agentId);
if (entity == null) {
entity = new LastModifiedDto();
entity.setCaptureAgentId(agentId);
entity.setLastModifiedDate(new Date());
em.persist(entity);
} else {
entity.setLastModifiedDate(new Date());
em.merge(entity);
}
});
} catch (Exception e) {
throw new SchedulerServiceDatabaseException(e);
}
}
@Override
public Date getLastModified(String agentId) throws NotFoundException, SchedulerServiceDatabaseException {
try {
return db.exec(namedQuery.findByIdOpt(LastModifiedDto.class, agentId))
.map(LastModifiedDto::getLastModifiedDate)
.orElseThrow(() -> new NotFoundException("Agent with ID " + agentId + " does not exist"));
} catch (NotFoundException e) {
throw e;
} catch (Exception e) {
throw new SchedulerServiceDatabaseException(e);
}
}
@Override
public Map<String, Date> getLastModifiedDates() throws SchedulerServiceDatabaseException {
try {
return db.exec(namedQuery.findAll("LastModified.findAll", LastModifiedDto.class)).stream()
.collect(Collectors.toMap(
LastModifiedDto::getCaptureAgentId,
LastModifiedDto::getLastModifiedDate
));
} catch (Exception e) {
throw new SchedulerServiceDatabaseException(e);
}
}
@Override
public void storeEvent(String mediapackageId, String organizationId, Optional<String> captureAgentId,
Optional<Date> start, Optional<Date> end, Optional<String> source, Optional<String> recordingState,
Optional<Long> recordingLastHeard, Optional<String> presenters, Optional<Date> lastModifiedDate,
Optional<String> checksum, Optional<Map<String, String>> workflowProperties,
Optional<Map<String, String>> caProperties
) throws SchedulerServiceDatabaseException {
try {
db.execTxChecked(em -> {
Optional<ExtendedEventDto> entityOpt = getExtendedEventDtoQuery(mediapackageId, organizationId).apply(em);
ExtendedEventDto entity = entityOpt.orElse(new ExtendedEventDto());
entity.setMediaPackageId(mediapackageId);
entity.setOrganization(organizationId);
if (captureAgentId.isPresent()) {
entity.setCaptureAgentId(captureAgentId.get());
}
if (start.isPresent()) {
entity.setStartDate(start.get());
}
if (end.isPresent()) {
entity.setEndDate(end.get());
}
if (source.isPresent()) {
entity.setSource(source.get());
}
if (recordingState.isPresent()) {
entity.setRecordingState(recordingState.get());
}
if (recordingLastHeard.isPresent()) {
entity.setRecordingLastHeard(recordingLastHeard.get());
}
if (presenters.isPresent()) {
entity.setPresenters(presenters.get());
}
if (lastModifiedDate.isPresent()) {
entity.setLastModifiedDate(lastModifiedDate.get());
}
if (checksum.isPresent()) {
entity.setChecksum(checksum.get());
}
if (workflowProperties.isPresent()) {
entity.setWorkflowProperties(gson.toJson(workflowProperties.get()));
}
if (caProperties.isPresent()) {
entity.setCaptureAgentProperties(gson.toJson(caProperties.get()));
}
if (entityOpt.isEmpty()) {
em.persist(entity);
} else {
em.merge(entity);
}
});
} catch (Exception e) {
throw new SchedulerServiceDatabaseException(e);
}
}
@Override
public List<String> getEvents(String captureAgentId, Date start, Date end, int separationMillis)
throws SchedulerServiceDatabaseException {
final Date extendedStart = Date.from(start.toInstant().minusMillis(separationMillis));
final Date extendedEnd = Date.from(end.toInstant().plusMillis(separationMillis));
try {
return db.exec(namedQuery.findAll(
"ExtendedEvent.findEvents",
String.class,
Pair.of("org", securityService.getOrganization().getId()),
Pair.of("ca", captureAgentId),
Pair.of("start", extendedStart),
Pair.of("end", extendedEnd)
));
} catch (Exception e) {
throw new SchedulerServiceDatabaseException(e);
}
}
@Override
public List<ExtendedEventDto> search(
Optional<String> captureAgentId,
Optional<Date> optStartsFrom,
Optional<Date> optStartsTo,
Optional<Date> optEndFrom,
Optional<Date> optEndTo,
Optional<Integer> limit) throws SchedulerServiceDatabaseException {
final Date startsFrom = optStartsFrom.orElse(new Date(0));
// A better value would be a Date initialized with Long.MAX_VALUE, but that leads to the DB (at least MySQL)
// returning zero results.
final Date farIntoTheFuture = DateTime.now().plusYears(30).toDate();
final Date startsTo = optStartsTo.orElse(farIntoTheFuture);
final Date endFrom = optEndFrom.orElse(new Date(0));
final Date endTo = optEndTo.orElse(farIntoTheFuture);
try {
return db.exec(em -> {
final TypedQuery<ExtendedEventDto> query;
if (captureAgentId.isPresent()) {
query = em.createNamedQuery("ExtendedEvent.searchEventsCA", ExtendedEventDto.class)
.setParameter("org", securityService.getOrganization().getId())
.setParameter("ca", captureAgentId.get())
.setParameter("startFrom", startsFrom)
.setParameter("startTo", startsTo)
.setParameter("endFrom", endFrom)
.setParameter("endTo", endTo);
} else {
query = em.createNamedQuery("ExtendedEvent.searchEvents", ExtendedEventDto.class)
.setParameter("org", securityService.getOrganization().getId())
.setParameter("startFrom", startsFrom)
.setParameter("startTo", startsTo)
.setParameter("endFrom", endFrom)
.setParameter("endTo", endTo);
}
if (limit.isPresent()) {
return query.setMaxResults(limit.get()).getResultList();
}
return query.getResultList();
});
} catch (Exception e) {
throw new SchedulerServiceDatabaseException(e);
}
}
@Override
public List<ExtendedEventDto> getKnownRecordings() throws SchedulerServiceDatabaseException {
try {
return db.exec(namedQuery.findAll(
"ExtendedEvent.knownRecordings",
ExtendedEventDto.class,
Pair.of("org", securityService.getOrganization().getId())
));
} catch (Exception e) {
throw new SchedulerServiceDatabaseException(e);
}
}
@Override
public void deleteEvent(String mediapackageId) throws NotFoundException, SchedulerServiceDatabaseException {
try {
db.execTxChecked(em -> {
final String orgId = securityService.getOrganization().getId();
Optional<ExtendedEventDto> entity = getExtendedEventDtoQuery(mediapackageId, orgId).apply(em);
if (entity.isEmpty()) {
throw new NotFoundException("Event with ID " + mediapackageId + " does not exist");
}
em.remove(entity.get());
});
} catch (NotFoundException e) {
throw e;
} catch (Exception e) {
throw new SchedulerServiceDatabaseException(e);
}
}
@Override
public List<ExtendedEventDto> getEvents() throws SchedulerServiceDatabaseException {
final String organization = securityService.getOrganization().getId();
try {
return db.exec(namedQuery.findAll(
"ExtendedEvent.findAll",
ExtendedEventDto.class,
Pair.of("org", organization)
));
} catch (Exception e) {
throw new SchedulerServiceDatabaseException(e);
}
}
@Override
public Optional<ExtendedEventDto> getEvent(String mediapackageId, String orgId)
throws SchedulerServiceDatabaseException {
try {
return db.exec(getExtendedEventDtoQuery(mediapackageId, orgId));
} catch (Exception e) {
throw new SchedulerServiceDatabaseException(e);
}
}
@Override
public Optional<ExtendedEventDto> getEvent(String mediapackageId) throws SchedulerServiceDatabaseException {
try {
final String orgId = securityService.getOrganization().getId();
return getEvent(mediapackageId, orgId);
} catch (SchedulerServiceDatabaseException e) {
throw e;
} catch (Exception e) {
throw new SchedulerServiceDatabaseException(e);
}
}
@Override
public void resetRecordingState(String mediapackageId) throws NotFoundException, SchedulerServiceDatabaseException {
try {
db.execTxChecked(em -> {
final String orgId = securityService.getOrganization().getId();
Optional<ExtendedEventDto> entity = getExtendedEventDtoQuery(mediapackageId, orgId).apply(em);
if (entity.isEmpty()) {
throw new NotFoundException("Event with ID " + mediapackageId + " does not exist");
}
entity.get().setRecordingState(null);
entity.get().setRecordingLastHeard(null);
em.merge(entity.get());
});
} catch (NotFoundException e) {
throw e;
} catch (Exception e) {
throw new SchedulerServiceDatabaseException(e);
}
}
@Override
public int countEvents() throws SchedulerServiceDatabaseException {
try {
return db.exec(namedQuery.find("ExtendedEvent.countAll", Number.class)).intValue();
} catch (Exception e) {
throw new SchedulerServiceDatabaseException(e);
}
}
private Function<EntityManager, Optional<ExtendedEventDto>> getExtendedEventDtoQuery(String id, String orgId) {
return namedQuery.findByIdOpt(ExtendedEventDto.class, new EventIdPK(id, orgId));
}
}