View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.event.comment.persistence;
22  
23  import static org.opencastproject.db.Queries.namedQuery;
24  
25  import org.opencastproject.db.DBSession;
26  import org.opencastproject.db.DBSessionFactory;
27  import org.opencastproject.elasticsearch.api.SearchIndexException;
28  import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
29  import org.opencastproject.elasticsearch.index.objects.event.Comment;
30  import org.opencastproject.elasticsearch.index.objects.event.Event;
31  import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
32  import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
33  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
34  import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
35  import org.opencastproject.event.comment.EventComment;
36  import org.opencastproject.security.api.Organization;
37  import org.opencastproject.security.api.OrganizationDirectoryService;
38  import org.opencastproject.security.api.SecurityService;
39  import org.opencastproject.security.api.User;
40  import org.opencastproject.security.api.UserDirectoryService;
41  import org.opencastproject.security.util.SecurityUtil;
42  import org.opencastproject.util.NotFoundException;
43  
44  import com.entwinemedia.fn.Fn;
45  import com.entwinemedia.fn.Stream;
46  
47  import org.apache.commons.lang3.tuple.Pair;
48  import org.osgi.service.component.ComponentContext;
49  import org.osgi.service.component.annotations.Activate;
50  import org.osgi.service.component.annotations.Component;
51  import org.osgi.service.component.annotations.Reference;
52  import org.slf4j.Logger;
53  import org.slf4j.LoggerFactory;
54  
55  import java.util.ArrayList;
56  import java.util.Hashtable;
57  import java.util.Iterator;
58  import java.util.List;
59  import java.util.Map;
60  import java.util.Optional;
61  import java.util.function.Function;
62  import java.util.stream.Collectors;
63  
64  import javax.persistence.EntityManager;
65  import javax.persistence.EntityManagerFactory;
66  
67  /**
68   * Implements permanent storage for event comments.
69   */
70  @Component(
71      immediate = true,
72      service = { EventCommentDatabaseService.class, IndexProducer.class },
73      property = {
74          "service.description=Event Comment Database Service"
75      }
76  )
77  public class EventCommentDatabaseServiceImpl extends AbstractIndexProducer implements EventCommentDatabaseService {
78    /** Logging utilities */
79    private static final Logger logger = LoggerFactory.getLogger(EventCommentDatabaseServiceImpl.class);
80  
81    public static final String PERSISTENCE_UNIT = "org.opencastproject.event.comment";
82  
83    /** Factory used to create {@link EntityManager}s for transactions */
84    private EntityManagerFactory emf;
85  
86    private DBSessionFactory dbSessionFactory;
87    private DBSession db;
88  
89    /** The security service used to retrieve organizations. */
90    private OrganizationDirectoryService organizationDirectoryService;
91  
92    /** The security service used to run the security context with. */
93    private SecurityService securityService;
94  
95    /** The user directory service */
96    private UserDirectoryService userDirectoryService;
97  
98    /** The component context this bundle is running in. */
99    private ComponentContext cc;
100 
101   /** The elasticsearch indices */
102   private ElasticsearchIndex index;
103 
104   /** OSGi component activation callback */
105   @Activate
106   public void activate(ComponentContext cc) {
107     logger.info("Activating persistence manager for event comments");
108     this.cc = cc;
109     db = dbSessionFactory.createSession(emf);
110   }
111 
112   /** OSGi DI */
113   @Reference(target = "(osgi.unit.name=org.opencastproject.event.comment)")
114   public void setEntityManagerFactory(EntityManagerFactory emf) {
115     this.emf = emf;
116   }
117 
118   @Reference
119   public void setDBSessionFactory(DBSessionFactory dbSessionFactory) {
120     this.dbSessionFactory = dbSessionFactory;
121   }
122 
123   /**
124    * OSGi callback to set the security context to run with.
125    *
126    * @param securityService
127    *          The security service
128    */
129   @Reference
130   public void setSecurityService(SecurityService securityService) {
131     this.securityService = securityService;
132   }
133 
134   /**
135    * OSGi callback to set the user directory service.
136    *
137    * @param userDirectoryService
138    *          the user directory service
139    */
140   @Reference
141   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
142     this.userDirectoryService = userDirectoryService;
143   }
144 
145   /**
146    * OSGi callback to set the organization directory service.
147    *
148    * @param organizationDirectoryService
149    *          the organization directory service
150    */
151   @Reference
152   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
153     this.organizationDirectoryService = organizationDirectoryService;
154   }
155 
156   /**
157    * OSgi callback for the Elasticsearch index.
158    *
159    * @param index
160    *          the Elasticsearch index.
161    */
162   @Reference
163   public void setIndex(ElasticsearchIndex index) {
164     this.index = index;
165   }
166 
167   @Override
168   public List<String> getReasons() throws EventCommentDatabaseException {
169     try {
170       return db.exec(namedQuery.findAll(
171           "EventComment.findReasons",
172           String.class,
173           Pair.of("org", securityService.getOrganization().getId())
174       ));
175     } catch (Exception e) {
176       logger.error("Could not get reasons", e);
177       throw new EventCommentDatabaseException(e);
178     }
179   }
180 
181   @Override
182   public EventComment getComment(long commentId) throws NotFoundException, EventCommentDatabaseException {
183     try {
184       Optional<EventCommentDto> event = db.exec(getEventCommentQuery(commentId));
185       if (event.isEmpty()) {
186         throw new NotFoundException("Event comment with ID " + commentId + " does not exist");
187       }
188       return event.get().toComment(userDirectoryService, organizationDirectoryService);
189     } catch (NotFoundException e) {
190       throw e;
191     } catch (Exception e) {
192       logger.error("Could not get event comment {}", commentId, e);
193       throw new EventCommentDatabaseException(e);
194     }
195   }
196 
197   @Override
198   public void deleteComment(long commentId) throws NotFoundException, EventCommentDatabaseException {
199     try {
200       EventCommentDto event = db.execTxChecked(em -> {
201         Optional<EventCommentDto> eventOpt = getEventCommentQuery(commentId).apply(em);
202         if (eventOpt.isEmpty()) {
203           throw new NotFoundException("Event comment with ID " + commentId + " does not exist");
204         }
205         em.remove(eventOpt.get());
206         return eventOpt.get();
207       });
208       updateIndices(event.getEventId());
209     } catch (NotFoundException e) {
210       throw e;
211     } catch (Exception e) {
212       logger.error("Could not delete event comment", e);
213       throw new EventCommentDatabaseException(e);
214     }
215   }
216 
217   @Override
218   public void deleteComments(String eventId) throws NotFoundException, EventCommentDatabaseException {
219     // Similar to deleteComment but we want to avoid sending a message for each deletion
220 
221     int count = 0;
222     try {
223       count = db.execTxChecked(em -> {
224         List<EventComment> comments = getComments(eventId);
225 
226         for (EventComment comment : comments) {
227           long commentId = comment.getId().get().intValue();
228           Optional<EventCommentDto> event = getEventCommentQuery(commentId).apply(em);
229           if (event.isEmpty()) {
230             throw new NotFoundException("Event comment with ID " + commentId + " does not exist");
231           }
232           em.remove(event.get());
233         }
234 
235         return comments.size();
236       });
237     } catch (NotFoundException e) {
238       throw e;
239     } catch (Exception e) {
240       logger.error("Could not delete event comments", e);
241       throw new EventCommentDatabaseException(e);
242     }
243 
244     // send updates only if we actually modified anything
245     if (count > 0) {
246       updateIndices(eventId);
247     }
248   }
249 
250   @Override
251   public EventComment updateComment(EventComment comment) throws EventCommentDatabaseException {
252     try {
253       final EventCommentDto commentDto = EventCommentDto.from(comment);
254       final EventComment updatedComment = db.execTx(namedQuery.persistOrUpdate(commentDto))
255           .toComment(userDirectoryService, organizationDirectoryService);
256       updateIndices(updatedComment.getEventId());
257       return updatedComment;
258     } catch (Exception e) {
259       throw new EventCommentDatabaseException(e);
260     }
261   }
262 
263   /**
264    * Gets an event comment, using the current organizational context.
265    *
266    * @param commentId
267    *          the comment identifier
268    *
269    * @return the event comment entity, or null if not found
270    */
271   private Function<EntityManager, Optional<EventCommentDto>> getEventCommentQuery(long commentId) {
272     return namedQuery.findOpt(
273         "EventComment.findByCommentId",
274         EventCommentDto.class,
275         Pair.of("commentId", commentId)
276     );
277   }
278 
279   @Override
280   public List<EventComment> getComments(String eventId) throws EventCommentDatabaseException {
281     try {
282       return db.exec(namedQuery.findAll(
283           "EventComment.findByEvent",
284               EventCommentDto.class,
285               Pair.of("eventId", eventId),
286               Pair.of("org", securityService.getOrganization().getId())
287           )).stream()
288           .map(c -> c.toComment(userDirectoryService, organizationDirectoryService))
289           .sorted((c1, c2) -> {
290             boolean v1 = c1.isResolvedStatus();
291             boolean v2 = c2.isResolvedStatus();
292             return (v1 ^ v2) ? ((v1 ^ false) ? 1 : -1) : 0;
293           })
294           .collect(Collectors.toList());
295     } catch (Exception e) {
296       logger.error("Could not retreive comments for event {}", eventId, e);
297       throw new EventCommentDatabaseException(e);
298     }
299   }
300 
301   public Iterator<EventCommentDto> getComments() throws EventCommentDatabaseException {
302     try {
303       return db.exec(namedQuery.findAll("EventComment.findAll", EventCommentDto.class)).iterator();
304     } catch (Exception e) {
305       logger.error("Could not retreive event comments", e);
306       throw new EventCommentDatabaseException(e);
307     }
308   }
309 
310   public int countComments() throws EventCommentDatabaseException {
311     try {
312       return db.exec(namedQuery.find("EventComment.countAll", Number.class)).intValue();
313     } catch (Exception e) {
314       logger.error("Could not find the number of comments.", e);
315       throw new EventCommentDatabaseException(e);
316     }
317   }
318 
319   /**
320    * Return all known event ID's with existing comments, grouped by organization ID
321    *
322    * @return a list of all event ID's grouped by organization ID
323    */
324   public Map<String, List<String>> getEventsWithComments() {
325     List<Object[]> orgIDsEventIDs = db.exec(namedQuery.findAll("EventComment.findAllWIthOrg", Object[].class));
326     Map<String, List<String>> orgEventsMap = new Hashtable<>();
327     for (Object[] orgEventResult : orgIDsEventIDs) {
328       String orgId = (String) orgEventResult[0];
329       String eventId = (String) orgEventResult[1];
330       if (!orgEventsMap.containsKey(orgId)) {
331         List<String> eventIds = new ArrayList<>();
332         eventIds.add(eventId);
333         orgEventsMap.put(orgId, eventIds);
334       } else if (!orgEventsMap.get(orgId).contains(eventId)) {
335         orgEventsMap.get(orgId).add(eventId);
336       }
337     }
338     return orgEventsMap;
339   }
340 
341   private void updateIndices(String eventId) throws EventCommentDatabaseException {
342     List<EventComment> comments = getComments(eventId);
343     boolean hasOpenComments = !Stream.$(comments).filter(filterOpenComments).toList().isEmpty();
344     boolean needsCutting = !Stream.$(comments).filter(filterNeedsCuttingComment).toList().isEmpty();
345 
346     String organization = securityService.getOrganization().getId();
347     User user = securityService.getUser();
348 
349     updateIndex(eventId, !comments.isEmpty(), hasOpenComments, comments, needsCutting, organization, user);
350   }
351 
352   private void updateIndex(String eventId, boolean hasComments, boolean hasOpenComments, List<EventComment> comments,
353           boolean needsCutting, String organization, User user) {
354     logger.debug("Updating comment status of event {} in the {} index.", eventId, index.getIndexName());
355     if (!hasComments && hasOpenComments) {
356       throw new IllegalStateException(
357               "Invalid comment update request: You can't have open comments without having any comments!");
358     }
359     if (!hasOpenComments && needsCutting) {
360       throw new IllegalStateException(
361               "Invalid comment update request: You can't have an needs cutting comment without having any open "
362                       + "comments!");
363     }
364 
365     Function<Optional<Event>, Optional<Event>> updateFunction = (Optional<Event> eventOpt) -> {
366       if (eventOpt.isEmpty()) {
367         logger.debug("Event {} not found for comment status updating", eventId);
368         return Optional.empty();
369       }
370       Event event = eventOpt.get();
371       event.setHasComments(hasComments);
372       event.setHasOpenComments(hasOpenComments);
373       List<Comment> indexComments = new ArrayList<Comment>();
374       for (EventComment comment : comments) {
375         indexComments.add(new Comment(
376                 comment.getId().get().toString(), comment.getReason(), comment.getText(), comment.isResolvedStatus()
377         ));
378         // Do we want to include replies? Maybe not, no good reason to filter for them?
379       }
380       event.setComments(indexComments);
381       event.setNeedsCutting(needsCutting);
382       return Optional.of(event);
383     };
384 
385     try {
386       index.addOrUpdateEvent(eventId, updateFunction, organization, user);
387     } catch (SearchIndexException e) {
388       logger.error("Error updating comment status of event {} in the {} index:", eventId, index.getIndexName(), e);
389     }
390   }
391 
392   private static final Fn<EventComment, Boolean> filterOpenComments = new Fn<EventComment, Boolean>() {
393     @Override
394     public Boolean apply(EventComment comment) {
395       return !comment.isResolvedStatus();
396     }
397   };
398 
399   private static final Fn<EventComment, Boolean> filterNeedsCuttingComment = new Fn<EventComment, Boolean>() {
400     @Override
401     public Boolean apply(EventComment comment) {
402       return EventComment.REASON_NEEDS_CUTTING.equals(comment.getReason()) && !comment.isResolvedStatus();
403     }
404   };
405 
406   @Override
407   public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
408     try {
409       final int total = countComments();
410       logIndexRebuildBegin(logger, total, "events with comment");
411       final int[] current = new int[1];
412       current[0] = 0;
413       int n = 20;
414       var updatedEventRange = new ArrayList<Event>();
415 
416       final Map<String, List<String>> eventsWithComments = getEventsWithComments();
417       for (String orgId : eventsWithComments.keySet()) {
418         Organization organization = organizationDirectoryService.getOrganization(orgId);
419         User systemUser = SecurityUtil.createSystemUser(cc, organization);
420         SecurityUtil.runAs(securityService, organization, systemUser,
421                 () -> {
422                   int i = 0;
423                   for (String eventId : eventsWithComments.get(orgId)) {
424                     try {
425                       current[0] += getComments(eventId).size();
426                       i++;
427 
428                       var updatedEventData = index.getEvent(eventId, orgId, securityService.getUser());
429                       updatedEventData = getEventUpdateFunction(eventId).apply(updatedEventData);
430                       updatedEventRange.add(updatedEventData.get());
431 
432                       if (updatedEventRange.size() >= n || i >= eventsWithComments.get(orgId).size()) {
433                         index.bulkEventUpdate(updatedEventRange);
434                         logIndexRebuildProgress(logger, total, current[0], n);
435                         updatedEventRange.clear();
436                       }
437                     } catch (Throwable t) {
438                       logSkippingElement(logger, "comment of event", eventId, organization, t);
439                     }
440                   }
441                 });
442       }
443     } catch (Exception e) {
444       logIndexRebuildError(logger, e);
445       throw new IndexRebuildException(getService(), e);
446     }
447   }
448 
449   @Override
450   public IndexRebuildService.Service getService() {
451     return IndexRebuildService.Service.Comments;
452   }
453   /**
454    * Get the function to update a commented event in the Elasticsearch index.
455    *
456    * @param eventId
457    *          The id of the current event
458    * @return the function to do the update
459    */
460   private Function<Optional<Event>, Optional<Event>> getEventUpdateFunction(String eventId) {
461     return (Optional<Event> eventOpt) -> {
462       List<EventComment> comments;
463       try {
464         if (eventOpt.isEmpty()) {
465           logger.debug("Event {} not found for comment status updating", eventId);
466           return Optional.empty();
467         }
468         comments = getComments(eventId);
469         Boolean hasComments = !comments.isEmpty();
470         Boolean hasOpenComments = !Stream.$(comments).filter(filterOpenComments).toList().isEmpty();
471         Boolean needsCutting = !Stream.$(comments).filter(filterNeedsCuttingComment).toList().isEmpty();
472 
473         logger.debug("Updating comment status of event {} in the {} index.", eventId, index.getIndexName());
474         if (!hasComments && hasOpenComments) {
475           throw new IllegalStateException(
476                   "Invalid comment update request: You can't have open comments without having any comments!");
477         }
478         if (!hasOpenComments && needsCutting) {
479           throw new IllegalStateException(
480                   "Invalid comment update request: You can't have an needs cutting comment without having any open "
481                           + "comments!");
482         }
483         Event event = eventOpt.get();
484         event.setHasComments(hasComments);
485         event.setHasOpenComments(hasOpenComments);
486         List<Comment> indexComments = new ArrayList<Comment>();
487         for (EventComment comment : comments) {
488           indexComments.add(new Comment(
489                   comment.getId().get().toString(), comment.getReason(), comment.getText(), comment.isResolvedStatus()
490           ));
491           // Do we want to include replies? Maybe not, no good reason to filter for them?
492         }
493         event.setComments(indexComments);
494         event.setNeedsCutting(needsCutting);
495         return Optional.of(event);
496       } catch (EventCommentDatabaseException e) {
497         logger.error("Unable to get comments from event {}", eventId, e);
498         return Optional.empty();
499       }
500     };
501   }
502 }