Harvest.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.tobira.impl;
import org.opencastproject.playlists.PlaylistService;
import org.opencastproject.search.api.SearchResult;
import org.opencastproject.search.api.SearchResultList;
import org.opencastproject.search.api.SearchService;
import org.opencastproject.security.api.AuthorizationService;
import org.opencastproject.security.api.SecurityConstants;
import org.opencastproject.security.api.SecurityService;
import org.opencastproject.security.api.UnauthorizedException;
import org.opencastproject.series.api.SeriesException;
import org.opencastproject.series.api.SeriesService;
import org.opencastproject.util.Jsons;
import org.opencastproject.workspace.api.Workspace;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/** Contains the actual harvesting logic. */
final class Harvest {
/** Private constructor for utility class with static methods */
private Harvest() { }
/**
* A duration to allow some leeway for certain database operations to be slow.
*
* The correct usage of the harvesting API depends on the correct adjustment of the {@code since}
* parameter between different requests. If `since` is increased too much, some event/series
* modifications could be missed. One fact which makes this harder is that the modified date of
* events/series can be significantly lower/before the time those changes are written to the
* database or index. That's because the "now" timestamp is created in Java and written to the
* database afterwards. Unfortunately, we are not just talking about milliseconds, but up to many
* seconds. This can have different reasons, but since Opencast can be distributed, network plays
* a big role here.
*
* But to not completely give up on an incremental harvesting API, we define an arbitrary "buffer
* period". Writes that take longer (i.e. the time between `new Date()` and the serialization in
* the DB/index) than this buffer could lead to missed updates with this harvesting API.
*/
private static final long TIME_BUFFER_SIZE = 3 * 60 * 1000;
private static final Logger logger = LoggerFactory.getLogger(Harvest.class);
static Jsons.Obj harvest(
int preferredAmount,
Date since,
SearchService searchService,
SeriesService seriesService,
AuthorizationService authorizationService,
SecurityService securityService,
PlaylistService playlistService,
Workspace workspace
) throws UnauthorizedException, SeriesException {
final var org = securityService.getOrganization().getId();
var user = securityService.getUser();
var orgAdminRole = securityService.getOrganization().getAdminRole();
var isAdmin = user.hasRole(SecurityConstants.GLOBAL_ADMIN_ROLE) || user.hasRole(orgAdminRole);
if (!isAdmin) {
throw new UnauthorizedException(user, "Only (org-) admins can access the Tobira harvest API");
}
// ===== Retrieve information about events, series, and playlists =============================
//
// In this step, we always request `preferredAmount + 1` in order to figure out the values for
// `hasMore` and `includesItemsUntil` more precisely.
// Retrieve episodes from index.
final SearchSourceBuilder q = new SearchSourceBuilder().query(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(SearchResult.ORG, org))
.must(QueryBuilders.rangeQuery(SearchResult.MODIFIED_DATE).gte(since.getTime()))
.must(QueryBuilders.termQuery(SearchResult.TYPE, SearchService.IndexEntryType.Episode)))
.sort(SearchResult.MODIFIED_DATE, SortOrder.ASC)
.size(preferredAmount + 1);
final SearchResultList results = searchService.search(q);
final var hasMoreEvents = results.getHits().size() >= preferredAmount + 1;
logger.debug("Retrieved {} events from the index during harvest", results.getHits().size());
// Start tracking the timestamp upper limit. It starts with "now" but gets decreased whenever we
// query items that are limited by `preferredAmount`. We can use this timestamp to restrict
// queries below, to avoid loading useless items: any item modified after this timestamp will be
// requested in the next harvest request anyway.
var includesItemsUntilRaw = new Date();
if (hasMoreEvents) {
includesItemsUntilRaw = results.getHits().get(results.getHits().size() - 1).getModifiedDate();
}
// Retrieve series from DB.
final var rawSeries = seriesService.getAllForAdministrativeRead(
since,
Optional.of(includesItemsUntilRaw),
preferredAmount + 1
);
final var hasMoreSeriesInRange = rawSeries.size() >= preferredAmount + 1;
logger.debug("Retrieved {} series from the database during harvest", rawSeries.size());
if (hasMoreSeriesInRange) {
final var lastSeriesUpdated = rawSeries.get(rawSeries.size() - 1).getModifiedDate();
if (lastSeriesUpdated.before(includesItemsUntilRaw)) {
includesItemsUntilRaw = lastSeriesUpdated;
}
}
// Retrieve playlists from DB.
final var rawPlaylists = playlistService.getAllForAdministrativeRead(
since,
includesItemsUntilRaw,
preferredAmount + 1
);
final var hasMorePlaylistsInRange = rawPlaylists.size() >= preferredAmount + 1;
logger.debug("Retrieved {} playlists from the database during harvest", rawPlaylists.size());
if (hasMorePlaylistsInRange) {
final var lastPlaylistUpdated = rawPlaylists.get(rawPlaylists.size() - 1).getUpdated();
if (lastPlaylistUpdated.before(includesItemsUntilRaw)) {
includesItemsUntilRaw = lastPlaylistUpdated;
}
}
// ===== Convert all items into the JSON output representation ================================
// We limit to `preferredAmount` here again, because we fetched `preferredAmount + 1` above.
final var eventItems = results.getHits().stream()
.limit(preferredAmount)
.map(event -> {
try {
return new Item(event, authorizationService, workspace);
} catch (Exception e) {
var id = event == null ? null : event.getId();
logger.error("Error reading event '{}' (skipping...)", id, e);
return null;
}
})
.filter(item -> item != null);
final var seriesItems = rawSeries.stream()
.limit(preferredAmount)
.map(series -> {
try {
return new Item(series);
} catch (Exception e) {
var id = series == null ? null : series.getId();
logger.error("Error reading series '{}' (skipping...)", id, e);
return null;
}
})
.filter(item -> item != null);
final var playlistItems = rawPlaylists.stream()
.limit(preferredAmount)
.map(playlist -> {
try {
return new Item(playlist);
} catch (Exception e) {
var id = playlist == null ? null : playlist.getId();
logger.error("Error reading playlist '{}' (skipping...)", id, e);
return null;
}
})
.filter(item -> item != null);
// Combine events, series, and playlists into one combined list and sort it. We filter out all
// items that were modified after `includesItemsUntilRaw` as those are transferred in the next
// request anyway. So this is just about response size savings.
//
// The sorting is, again, not for correctness, because consumers of this API need to be
// able to deal with that. However, sorting this here will result in fewer temporary objects
// or invalid states in the consumer.
Date finalIncludesItemsUntilRaw = includesItemsUntilRaw; // copy for lambda
final var items = Stream.of(eventItems, seriesItems, playlistItems)
.flatMap(Function.identity())
.filter(item -> !item.getModifiedDate().after(finalIncludesItemsUntilRaw))
.collect(Collectors.toCollection(ArrayList::new));
items.sort(Comparator.comparing(item -> item.getModifiedDate()));
// Obtain information to allow Tobira to plan the next harvesting request.
final var hasMore = hasMoreEvents || hasMoreSeriesInRange || hasMorePlaylistsInRange;
// The `includesItemsUntil` we return has to be at least `TIME_BUFFER_SIZE` in the past. See
// the constant's documentation for more information on that.
final var includesItemsUntil = Math.min(
includesItemsUntilRaw.getTime(),
new Date().getTime() - TIME_BUFFER_SIZE
);
// Assembly full response.
final var outItems = items.stream()
.map(item -> item.getJson())
.collect(Collectors.toCollection(ArrayList::new));
final var json = Jsons.obj(
Jsons.p("includesItemsUntil", includesItemsUntil),
Jsons.p("hasMore", hasMore),
Jsons.p("items", Jsons.arr(outItems))
);
logger.debug(
"Returning {} items from harvesting (hasMore = {}, includesItemsUntil = {})",
items.size(),
hasMore,
new Date(includesItemsUntil)
);
return json;
}
}