EventCommentDatabaseServiceImpl.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.event.comment.persistence;
import static org.opencastproject.db.Queries.namedQuery;
import org.opencastproject.db.DBSession;
import org.opencastproject.db.DBSessionFactory;
import org.opencastproject.elasticsearch.api.SearchIndexException;
import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
import org.opencastproject.elasticsearch.index.objects.event.Comment;
import org.opencastproject.elasticsearch.index.objects.event.Event;
import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
import org.opencastproject.event.comment.EventComment;
import org.opencastproject.security.api.Organization;
import org.opencastproject.security.api.OrganizationDirectoryService;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.User;
import org.opencastproject.security.api.UserDirectoryService;
import org.opencastproject.security.util.SecurityUtil;
import org.opencastproject.util.NotFoundException;
import org.apache.commons.lang3.tuple.Pair;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
/**
* Implements permanent storage for event comments.
*/
@Component(
immediate = true,
service = { EventCommentDatabaseService.class, IndexProducer.class },
property = {
"service.description=Event Comment Database Service"
}
)
public class EventCommentDatabaseServiceImpl extends AbstractIndexProducer implements EventCommentDatabaseService {
/** Logging utilities */
private static final Logger logger = LoggerFactory.getLogger(EventCommentDatabaseServiceImpl.class);
public static final String PERSISTENCE_UNIT = "org.opencastproject.event.comment";
/** Factory used to create {@link EntityManager}s for transactions */
private EntityManagerFactory emf;
private DBSessionFactory dbSessionFactory;
private DBSession db;
/** The security service used to retrieve organizations. */
private OrganizationDirectoryService organizationDirectoryService;
/** The security service used to run the security context with. */
private SecurityService securityService;
/** The user directory service */
private UserDirectoryService userDirectoryService;
/** The component context this bundle is running in. */
private ComponentContext cc;
/** The elasticsearch indices */
private ElasticsearchIndex index;
/** OSGi component activation callback */
@Activate
public void activate(ComponentContext cc) {
logger.info("Activating persistence manager for event comments");
this.cc = cc;
db = dbSessionFactory.createSession(emf);
}
/** OSGi DI */
@Reference(target = "(osgi.unit.name=org.opencastproject.event.comment)")
public void setEntityManagerFactory(EntityManagerFactory emf) {
this.emf = emf;
}
@Reference
public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
this.dbSessionFactory = dbSessionFactory;
}
/**
* OSGi callback to set the security context to run with.
*
* @param securityService
* The security service
*/
@Reference
public void setSecurityService(SecurityService securityService) {
this.securityService = securityService;
}
/**
* OSGi callback to set the user directory service.
*
* @param userDirectoryService
* the user directory service
*/
@Reference
public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
this.userDirectoryService = userDirectoryService;
}
/**
* OSGi callback to set the organization directory service.
*
* @param organizationDirectoryService
* the organization directory service
*/
@Reference
public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
this.organizationDirectoryService = organizationDirectoryService;
}
/**
* OSgi callback for the Elasticsearch index.
*
* @param index
* the Elasticsearch index.
*/
@Reference
public void setIndex(ElasticsearchIndex index) {
this.index = index;
}
@Override
public List<String> getReasons() throws EventCommentDatabaseException {
try {
return db.exec(namedQuery.findAll(
"EventComment.findReasons",
String.class,
Pair.of("org", securityService.getOrganization().getId())
));
} catch (Exception e) {
logger.error("Could not get reasons", e);
throw new EventCommentDatabaseException(e);
}
}
@Override
public EventComment getComment(long commentId) throws NotFoundException, EventCommentDatabaseException {
try {
Optional<EventCommentDto> event = db.exec(getEventCommentQuery(commentId));
if (event.isEmpty()) {
throw new NotFoundException("Event comment with ID " + commentId + " does not exist");
}
return event.get().toComment(userDirectoryService, organizationDirectoryService);
} catch (NotFoundException e) {
throw e;
} catch (Exception e) {
logger.error("Could not get event comment {}", commentId, e);
throw new EventCommentDatabaseException(e);
}
}
@Override
public void deleteComment(long commentId) throws NotFoundException, EventCommentDatabaseException {
try {
EventCommentDto event = db.execTxChecked(em -> {
Optional<EventCommentDto> eventOpt = getEventCommentQuery(commentId).apply(em);
if (eventOpt.isEmpty()) {
throw new NotFoundException("Event comment with ID " + commentId + " does not exist");
}
em.remove(eventOpt.get());
return eventOpt.get();
});
updateCommentsInIndex(event.getEventId());
} catch (NotFoundException e) {
throw e;
} catch (Exception e) {
logger.error("Could not delete event comment", e);
throw new EventCommentDatabaseException(e);
}
}
@Override
public void deleteComments(String eventId) throws NotFoundException, EventCommentDatabaseException {
// Similar to deleteComment but we want to avoid sending a message for each deletion
int count = 0;
try {
count = db.execTxChecked(em -> {
List<EventComment> comments = getComments(eventId);
for (EventComment comment : comments) {
long commentId = comment.getId().get().intValue();
Optional<EventCommentDto> event = getEventCommentQuery(commentId).apply(em);
if (event.isEmpty()) {
throw new NotFoundException("Event comment with ID " + commentId + " does not exist");
}
em.remove(event.get());
}
return comments.size();
});
} catch (NotFoundException e) {
throw e;
} catch (Exception e) {
logger.error("Could not delete event comments", e);
throw new EventCommentDatabaseException(e);
}
// send updates only if we actually modified anything
if (count > 0) {
updateCommentsInIndex(eventId);
}
}
@Override
public EventComment updateComment(EventComment comment) throws EventCommentDatabaseException {
try {
final EventCommentDto commentDto = EventCommentDto.from(comment);
final EventComment updatedComment = db.execTx(namedQuery.persistOrUpdate(commentDto))
.toComment(userDirectoryService, organizationDirectoryService);
updateCommentsInIndex(updatedComment.getEventId());
return updatedComment;
} catch (Exception e) {
throw new EventCommentDatabaseException(e);
}
}
/**
* Gets an event comment, using the current organizational context.
*
* @param commentId
* the comment identifier
*
* @return the event comment entity, or null if not found
*/
private Function<EntityManager, Optional<EventCommentDto>> getEventCommentQuery(long commentId) {
return namedQuery.findOpt(
"EventComment.findByCommentId",
EventCommentDto.class,
Pair.of("commentId", commentId)
);
}
@Override
public List<EventComment> getComments(String eventId) throws EventCommentDatabaseException {
try {
return db.exec(namedQuery.findAll(
"EventComment.findByEvent",
EventCommentDto.class,
Pair.of("eventId", eventId),
Pair.of("org", securityService.getOrganization().getId())
)).stream()
.map(c -> c.toComment(userDirectoryService, organizationDirectoryService))
.sorted((c1, c2) -> {
boolean v1 = c1.isResolvedStatus();
boolean v2 = c2.isResolvedStatus();
return (v1 ^ v2) ? ((v1 ^ false) ? 1 : -1) : 0;
})
.collect(Collectors.toList());
} catch (Exception e) {
logger.error("Could not retreive comments for event {}", eventId, e);
throw new EventCommentDatabaseException(e);
}
}
public Iterator<EventCommentDto> getComments() throws EventCommentDatabaseException {
try {
return db.exec(namedQuery.findAll("EventComment.findAll", EventCommentDto.class)).iterator();
} catch (Exception e) {
logger.error("Could not retreive event comments", e);
throw new EventCommentDatabaseException(e);
}
}
public int countComments() throws EventCommentDatabaseException {
try {
return db.exec(namedQuery.find("EventComment.countAll", Number.class)).intValue();
} catch (Exception e) {
logger.error("Could not find the number of comments.", e);
throw new EventCommentDatabaseException(e);
}
}
/**
* Return all known event ID's with existing comments, grouped by organization ID
*
* @return a list of all event ID's grouped by organization ID
*/
public Map<String, List<String>> getEventsWithComments() {
List<Object[]> orgIDsEventIDs = db.exec(namedQuery.findAll("EventComment.findAllWIthOrg", Object[].class));
Map<String, List<String>> orgEventsMap = new Hashtable<>();
for (Object[] orgEventResult : orgIDsEventIDs) {
String orgId = (String) orgEventResult[0];
String eventId = (String) orgEventResult[1];
if (!orgEventsMap.containsKey(orgId)) {
List<String> eventIds = new ArrayList<>();
eventIds.add(eventId);
orgEventsMap.put(orgId, eventIds);
} else if (!orgEventsMap.get(orgId).contains(eventId)) {
orgEventsMap.get(orgId).add(eventId);
}
}
return orgEventsMap;
}
private void updateCommentsInIndex(String eventId) throws EventCommentDatabaseException {
String organization = securityService.getOrganization().getId();
User user = securityService.getUser();
Function<Optional<Event>, Optional<Event>> updateFunction = getEventUpdateFunction(eventId);
try {
index.addOrUpdateEvent(eventId, updateFunction, organization, user);
} catch (SearchIndexException e) {
logger.error("Error updating comment status of event {} in the {} index:", eventId, index.getIndexName(), e);
}
}
private static final Function<EventComment, Boolean> filterOpenComments = new Function<EventComment, Boolean>() {
@Override
public Boolean apply(EventComment comment) {
return !comment.isResolvedStatus();
}
};
private static final Function<EventComment, Boolean> filterNeedsCuttingComment =
new Function<EventComment, Boolean>() {
@Override
public Boolean apply(EventComment comment) {
return EventComment.REASON_NEEDS_CUTTING.equals(comment.getReason()) && !comment.isResolvedStatus();
}
};
@Override
public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
try {
final int total = countComments();
logIndexRebuildBegin(logger, total, "events with comment");
final int[] current = new int[1];
current[0] = 0;
int n = 20;
var updatedEventRange = new ArrayList<Event>();
final Map<String, List<String>> eventsWithComments = getEventsWithComments();
for (String orgId : eventsWithComments.keySet()) {
Organization organization = organizationDirectoryService.getOrganization(orgId);
User systemUser = SecurityUtil.createSystemUser(cc, organization);
SecurityUtil.runAs(securityService, organization, systemUser,
() -> {
int i = 0;
for (String eventId : eventsWithComments.get(orgId)) {
try {
current[0] += getComments(eventId).size();
i++;
var updatedEventData = index.getEvent(eventId, orgId, securityService.getUser());
updatedEventData = getEventUpdateFunction(eventId).apply(updatedEventData);
updatedEventData.ifPresent(updatedEventRange::add);
if (updatedEventRange.size() >= n || i >= eventsWithComments.get(orgId).size()) {
index.bulkEventUpdate(updatedEventRange);
logIndexRebuildProgress(logger, total, current[0], n);
updatedEventRange.clear();
}
} catch (Throwable t) {
logSkippingElement(logger, "comment of event", eventId, organization, t);
}
}
});
}
} catch (Exception e) {
logIndexRebuildError(logger, e);
throw new IndexRebuildException(getService(), e);
}
}
@Override
public IndexRebuildService.Service getService() {
return IndexRebuildService.Service.Comments;
}
/**
* Get the function to update a commented event in the Elasticsearch index.
*
* @param eventId
* The id of the current event
* @return the function to do the update
*/
private Function<Optional<Event>, Optional<Event>> getEventUpdateFunction(String eventId) {
return (Optional<Event> eventOpt) -> {
if (eventOpt.isEmpty()) {
logger.debug("Event {} not found for comment status updating", eventId);
return Optional.empty();
}
Event event = eventOpt.get();
List<EventComment> comments;
try {
comments = getComments(eventId);
} catch (EventCommentDatabaseException e) {
logger.error("Unable to get comments from event {}", eventId, e);
return Optional.empty();
}
boolean hasComments = !comments.isEmpty();
boolean hasOpenComments = comments.stream().anyMatch(filterOpenComments::apply);
boolean needsCutting = comments.stream().anyMatch(filterNeedsCuttingComment::apply);
logger.debug("Updating comment status of event {} in the {} index.", eventId, index.getIndexName());
if (!hasOpenComments && needsCutting) {
throw new IllegalStateException(
"Invalid comment update request: You can't have an needs cutting comment without having any open "
+ "comments!");
}
event.setHasComments(hasComments);
event.setHasOpenComments(hasOpenComments);
List<Comment> indexComments = new ArrayList<Comment>();
for (EventComment comment : comments) {
indexComments.add(new Comment(comment.getId().get().toString(), comment.getReason(), comment.getText(),
comment.isResolvedStatus()));
// Do we want to include replies? Maybe not, no good reason to filter for them?
}
event.setComments(indexComments);
event.setNeedsCutting(needsCutting);
return Optional.of(event);
};
}
}