1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.search.impl;
23
24 import static org.opencastproject.security.util.SecurityUtil.getEpisodeRoleId;
25
26 import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
27 import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
28 import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
29 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
30 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
31 import org.opencastproject.list.api.ListProviderException;
32 import org.opencastproject.list.api.ListProvidersService;
33 import org.opencastproject.list.api.ResourceListQuery;
34 import org.opencastproject.list.impl.ResourceListQueryImpl;
35 import org.opencastproject.mediapackage.MediaPackage;
36 import org.opencastproject.metadata.dublincore.DublinCore;
37 import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
38 import org.opencastproject.metadata.dublincore.DublinCoreUtil;
39 import org.opencastproject.metadata.dublincore.DublinCoreValue;
40 import org.opencastproject.metadata.dublincore.DublinCores;
41 import org.opencastproject.search.api.SearchException;
42 import org.opencastproject.search.api.SearchResult;
43 import org.opencastproject.search.api.SearchService;
44 import org.opencastproject.search.impl.persistence.SearchServiceDatabase;
45 import org.opencastproject.search.impl.persistence.SearchServiceDatabaseException;
46 import org.opencastproject.security.api.AccessControlEntry;
47 import org.opencastproject.security.api.AccessControlList;
48 import org.opencastproject.security.api.AuthorizationService;
49 import org.opencastproject.security.api.Organization;
50 import org.opencastproject.security.api.OrganizationDirectoryService;
51 import org.opencastproject.security.api.Permissions;
52 import org.opencastproject.security.api.Role;
53 import org.opencastproject.security.api.SecurityConstants;
54 import org.opencastproject.security.api.SecurityService;
55 import org.opencastproject.security.api.UnauthorizedException;
56 import org.opencastproject.security.api.User;
57 import org.opencastproject.security.util.SecurityUtil;
58 import org.opencastproject.series.api.SeriesException;
59 import org.opencastproject.series.api.SeriesService;
60 import org.opencastproject.util.NotFoundException;
61 import org.opencastproject.util.data.Tuple;
62 import org.opencastproject.workspace.api.Workspace;
63
64 import com.google.gson.Gson;
65 import com.google.gson.JsonElement;
66
67 import org.apache.commons.io.IOUtils;
68 import org.elasticsearch.ElasticsearchStatusException;
69 import org.elasticsearch.action.DocWriteResponse;
70 import org.elasticsearch.action.index.IndexRequest;
71 import org.elasticsearch.action.search.SearchRequest;
72 import org.elasticsearch.action.search.SearchResponse;
73 import org.elasticsearch.action.update.UpdateRequest;
74 import org.elasticsearch.action.update.UpdateResponse;
75 import org.elasticsearch.client.RequestOptions;
76 import org.elasticsearch.client.indices.CreateIndexRequest;
77 import org.elasticsearch.common.xcontent.XContentType;
78 import org.elasticsearch.rest.RestStatus;
79 import org.elasticsearch.search.builder.SearchSourceBuilder;
80 import org.osgi.service.component.ComponentContext;
81 import org.osgi.service.component.annotations.Activate;
82 import org.osgi.service.component.annotations.Component;
83 import org.osgi.service.component.annotations.Reference;
84 import org.slf4j.Logger;
85 import org.slf4j.LoggerFactory;
86
87 import java.io.IOException;
88 import java.io.InputStream;
89 import java.nio.charset.StandardCharsets;
90 import java.time.Instant;
91 import java.time.format.DateTimeFormatter;
92 import java.util.Collections;
93 import java.util.Date;
94 import java.util.HashMap;
95 import java.util.HashSet;
96 import java.util.List;
97 import java.util.Map;
98 import java.util.Objects;
99 import java.util.Set;
100 import java.util.concurrent.atomic.AtomicInteger;
101 import java.util.stream.Collectors;
102
103
104
105
106 @Component(
107 immediate = true,
108 service = { SearchServiceIndex.class, IndexProducer.class },
109 property = {
110 "service.description=Search Service Index",
111 "service.pid=org.opencastproject.search.impl.SearchServiceIndex"
112 }
113 )
114 public final class SearchServiceIndex extends AbstractIndexProducer implements IndexProducer {
115
116 @Override
117 public IndexRebuildService.Service getService() {
118 return IndexRebuildService.Service.Search;
119 }
120
121
122 private static final Logger logger = LoggerFactory.getLogger(SearchServiceIndex.class);
123
124 public static final String INDEX_NAME = "opencast_search";
125
126 private final Gson gson = new Gson();
127
128 private ElasticsearchIndex esIndex;
129
130 private SeriesService seriesService;
131
132
133 private Workspace workspace;
134
135
136 private SecurityService securityService;
137
138
139 private AuthorizationService authorizationService;
140
141
142 private SearchServiceDatabase persistence;
143
144
145 private OrganizationDirectoryService organizationDirectory = null;
146
147 private ListProvidersService listProvidersService;
148
149 private String systemUserName = null;
150
151
152
153
154
155 public SearchServiceIndex() {
156 }
157
158
159
160
161
162
163
164 @Activate
165 public void activate(final ComponentContext cc) throws IllegalStateException {
166 createIndex();
167 systemUserName = SecurityUtil.getSystemUserName(cc);
168 }
169
170 private void createIndex() {
171 var mapping = "";
172 try (var in = this.getClass().getResourceAsStream("/search-mapping.json")) {
173 mapping = IOUtils.toString(in, StandardCharsets.UTF_8);
174 } catch (IOException e) {
175 throw new SearchException("Could not read mapping.", e);
176 }
177 try {
178 logger.debug("Trying to create index for '{}'", INDEX_NAME);
179 InputStream is = getClass().getResourceAsStream("/elasticsearch/indexSettings.json");
180 String indexSettings = IOUtils.toString(is, StandardCharsets.UTF_8);
181 final CreateIndexRequest request = new CreateIndexRequest(INDEX_NAME)
182 .settings(indexSettings, XContentType.JSON)
183 .mapping(mapping, XContentType.JSON);
184 var response = esIndex.getClient().indices().create(request, RequestOptions.DEFAULT);
185 if (!response.isAcknowledged()) {
186 throw new SearchException("Unable to create index for '" + INDEX_NAME + "'");
187 }
188 } catch (ElasticsearchStatusException e) {
189 if (e.getDetailedMessage().contains("already_exists_exception")) {
190 logger.info("Detected existing index '{}'", INDEX_NAME);
191 } else {
192 throw e;
193 }
194 } catch (IOException e) {
195 throw new SearchException(e);
196 }
197 }
198
199 @Reference
200 public void setEsIndex(ElasticsearchIndex esIndex) {
201 this.esIndex = esIndex;
202 }
203
204
205 public SearchResponse search(SearchSourceBuilder searchSource) throws SearchException {
206 SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
207 logger.debug("Sending for query: {}", searchSource.query());
208 searchRequest.source(searchSource);
209 try {
210 return esIndex.getClient().search(searchRequest, RequestOptions.DEFAULT);
211 } catch (IOException e) {
212 throw new SearchException(e);
213 }
214 }
215
216
217
218
219
220
221
222
223
224
225
226
227
228 public void addSynchronously(MediaPackage mediaPackage)
229 throws SearchException, IllegalArgumentException, UnauthorizedException, SearchServiceDatabaseException {
230 if (mediaPackage == null) {
231 throw new IllegalArgumentException("Unable to add a null mediapackage");
232 }
233 var mediaPackageId = mediaPackage.getIdentifier().toString();
234
235 checkSearchEntityWritePermission(mediaPackageId);
236
237 logger.debug("Attempting to add media package {} to search index", mediaPackageId);
238 final var acls = new AccessControlList[1];
239 final var org = securityService.getOrganization();
240 final var systemUser = SecurityUtil.createSystemUser(systemUserName, org);
241
242 SecurityUtil.runAs(securityService, org, systemUser, () -> {
243 acls[0] = authorizationService.getActiveAcl(mediaPackage).getA();
244 });
245 var acl = acls[0] == null ? new AccessControlList() : acls[0];
246 var now = new Date();
247
248 try {
249 persistence.storeMediaPackage(mediaPackage, acl, now);
250 } catch (SearchServiceDatabaseException e) {
251 throw new SearchException(String.format("Could not store media package to search database %s", mediaPackageId),
252 e);
253 }
254
255 indexMediaPackage(mediaPackage, acl);
256 }
257
258 public void indexMediaPackage(String mediaPackageId)
259 throws SearchException, SearchServiceDatabaseException, UnauthorizedException, NotFoundException {
260 if (!securityService.getUser().hasRole("ROLE_ADMIN")) {
261 throw new UnauthorizedException("Only global administrators may trigger manual event updates.");
262 }
263 try {
264 MediaPackage mp = persistence.getMediaPackage(mediaPackageId);
265 AccessControlList acl = persistence.getAccessControlList(mediaPackageId);
266 Date modificationDate = persistence.getModificationDate(mediaPackageId);
267 Date deletionDate = persistence.getDeletionDate(mediaPackageId);
268 indexMediaPackage(mp, acl, modificationDate, deletionDate);
269 } catch (RuntimeException e) {
270 logSkippingElement(logger, "event", mediaPackageId, e);
271 }
272 }
273
274 private void indexMediaPackage(MediaPackage mediaPackage, AccessControlList acl)
275 throws SearchException, SearchServiceDatabaseException {
276 indexMediaPackage(mediaPackage, acl, null, null);
277 }
278
279 private void indexMediaPackage(MediaPackage mediaPackage, AccessControlList acl, Date modDate, Date delDate)
280 throws SearchException, SearchServiceDatabaseException {
281 String mediaPackageId = mediaPackage.getIdentifier().toString();
282 String orgId = securityService.getOrganization().getId();
283
284
285 DublinCoreCatalog dc = null == delDate
286 ? DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage).orElse(DublinCores.mkSimple())
287 : DublinCores.mkSimple();
288
289 List<DublinCoreCatalog> seriesList = Collections.emptyList();
290 if (dc.hasValue(DublinCore.PROPERTY_IS_PART_OF)) {
291
292 seriesList = dc.get(DublinCore.PROPERTY_IS_PART_OF).stream().map(DublinCoreValue::getValue).map(s -> {
293 try {
294 return seriesService.getSeries(s);
295 } catch (NotFoundException e) {
296 logger.warn("Series {} not found during index of event {}, omitting the link from the indexed data", s,
297 mediaPackageId);
298 } catch (UnauthorizedException e) {
299 logger.warn("Not authorized for series {} during index of event {}, omitting the link from the indexed data",
300 s, mediaPackageId);
301 } catch (SeriesException e) {
302 throw new SearchException(e);
303 }
304 return null;
305 }).filter(Objects::nonNull).collect(Collectors.toList());
306 }
307
308
309 acl = addCustomAclRoles(mediaPackageId, acl);
310
311 SearchResult item = new SearchResult(SearchService.IndexEntryType.Episode, dc, acl, orgId, mediaPackage,
312 null != modDate ? modDate.toInstant() : Instant.now(),
313 null != delDate ? delDate.toInstant() : null);
314 Map<String, Object> metadata = item.dehydrateForIndex();
315 try {
316 var request = new IndexRequest(INDEX_NAME);
317 request.id(mediaPackageId);
318 request.source(metadata);
319 esIndex.getClient().index(request, RequestOptions.DEFAULT);
320 logger.debug("Indexed episode {}", mediaPackageId);
321 } catch (IOException e) {
322 throw new SearchException(e);
323 }
324
325
326 for (DublinCoreCatalog seriesDc : seriesList) {
327 String seriesId = seriesDc.getFirst(DublinCore.PROPERTY_IDENTIFIER);
328 AccessControlList seriesAcl = persistence.getAccessControlLists(seriesId, mediaPackageId).stream()
329 .map(aclPair -> addCustomAclRoles(aclPair.getKey(), aclPair.getValue()))
330 .reduce(new AccessControlList(acl.getEntries()), AccessControlList::mergeActions);
331 item = new SearchResult(SearchService.IndexEntryType.Series, seriesDc, seriesAcl, orgId,
332 null, Instant.now(), null);
333
334 Map<String, Object> seriesData = item.dehydrateForIndex();
335 try {
336 var request = new IndexRequest(INDEX_NAME);
337 request.id(seriesId);
338 request.source(seriesData);
339 esIndex.getClient().index(request, RequestOptions.DEFAULT);
340 logger.debug("Indexed series {} related to episode {}", seriesId, mediaPackageId);
341 } catch (IOException e) {
342 throw new SearchException(e);
343 }
344 }
345 }
346
347
348
349
350
351
352
353
354
355
356
357 private AccessControlList addCustomAclRoles(String mediaPackageId, AccessControlList acl) {
358
359 Set<AccessControlEntry> customEntries = new HashSet<>();
360 customEntries.add(new AccessControlEntry(getEpisodeRoleId(mediaPackageId, "READ"), "read", true));
361 customEntries.add(new AccessControlEntry(getEpisodeRoleId(mediaPackageId, "WRITE"), "write", true));
362
363 ResourceListQuery query = new ResourceListQueryImpl();
364 if (listProvidersService.hasProvider("ACL.ACTIONS")) {
365 Map<String, String> actions = new HashMap<>();
366 try {
367 actions = listProvidersService.getList("ACL.ACTIONS", query, true);
368 } catch (ListProviderException e) {
369 throw new SearchException("Listproviders not loaded. " + e);
370 }
371 for (String action : actions.keySet()) {
372 customEntries.add(
373 new AccessControlEntry(getEpisodeRoleId(mediaPackageId, action), action, true));
374 }
375 }
376
377 return acl;
378 }
379
380 private void checkSearchEntityWritePermission(final String mediaPackageId) throws SearchException {
381 User user = securityService.getUser();
382 try {
383 AccessControlList acl = persistence.getAccessControlList(mediaPackageId);
384 if (!authorizationService.hasPermission(acl, Permissions.Action.WRITE.toString())) {
385 boolean isAdmin = user.getRoles().stream()
386 .map(Role::getName)
387 .anyMatch(r -> r.equals(SecurityConstants.GLOBAL_ADMIN_ROLE));
388 if (!isAdmin) {
389 throw new UnauthorizedException(user, "Write permission denied for " + mediaPackageId, acl);
390 } else {
391 logger.debug("Write for {} is not allowed by ACL, but user has {}",
392 mediaPackageId, SecurityConstants.GLOBAL_ADMIN_ROLE);
393 }
394 }
395 } catch (NotFoundException e) {
396 logger.debug("Mediapackage {} does not exist or was deleted, allowing writes for user {}", mediaPackageId, user);
397 } catch (SearchServiceDatabaseException | UnauthorizedException e) {
398 throw new SearchException(e);
399 }
400 }
401
402
403
404
405
406
407
408
409
410
411 public boolean deleteSynchronously(final String mediaPackageId) throws SearchException {
412
413 checkSearchEntityWritePermission(mediaPackageId);
414
415 String deletionString = DateTimeFormatter.ISO_INSTANT.format(Instant.now());
416
417 try {
418 logger.info("Marking media package {} as deleted in search index", mediaPackageId);
419 JsonElement json = gson.toJsonTree(Map.of(
420 SearchResult.DELETED_DATE, deletionString,
421 SearchResult.MODIFIED_DATE, deletionString));
422 var updateRequst = new UpdateRequest(INDEX_NAME, mediaPackageId)
423 .doc(gson.toJson(json), XContentType.JSON);
424 esIndex.getClient().update(updateRequst, RequestOptions.DEFAULT);
425 } catch (ElasticsearchStatusException e) {
426 if (e.status().getStatus() != RestStatus.NOT_FOUND.getStatus()) {
427 throw e;
428 }
429 logger.warn("Event {} is not in the search index. Skipping deletion", mediaPackageId);
430 } catch (IOException e) {
431 throw new SearchException("Could not delete episode " + mediaPackageId + " from index", e);
432 }
433
434 try {
435 logger.info("Marking media package {} as deleted in search database", mediaPackageId);
436
437 String seriesId = null;
438 Date now = new Date();
439 try {
440 seriesId = persistence.getMediaPackage(mediaPackageId).getSeries();
441 persistence.deleteMediaPackage(mediaPackageId, now);
442 logger.info("Removed media package {} from search persistence", mediaPackageId);
443 } catch (NotFoundException e) {
444
445 logger.info("Could not find media package with id {} in persistence, but will try remove it from index anyway.",
446 mediaPackageId);
447 } catch (SearchServiceDatabaseException | UnauthorizedException e) {
448 throw new SearchException(
449 String.format("Could not delete media package with id %s from persistence storage", mediaPackageId), e);
450 }
451
452
453 if (seriesId != null) {
454 try {
455 if (!persistence.getSeries(seriesId).isEmpty()) {
456
457 final AccessControlList seriesAcl = persistence.getAccessControlLists(seriesId).stream()
458 .map(aclPair -> addCustomAclRoles(aclPair.getKey(), aclPair.getValue()))
459 .reduce(new AccessControlList(), AccessControlList::mergeActions);
460 JsonElement json = gson.toJsonTree(Map.of(
461 SearchResult.INDEX_ACL, SearchResult.dehydrateAclForIndex(seriesAcl),
462 SearchResult.MODIFIED_DATE, deletionString));
463 var updateRequest = new UpdateRequest(INDEX_NAME, seriesId).doc(gson.toJson(json), XContentType.JSON);
464 try {
465 esIndex.getClient().update(updateRequest, RequestOptions.DEFAULT);
466 } catch (ElasticsearchStatusException e) {
467 if (RestStatus.NOT_FOUND == e.status()) {
468 logger.warn("Attempted to modify {}, but that series does not exist in the index.", seriesId);
469 }
470 }
471 } else {
472
473 deleteSeriesSynchronously(seriesId);
474 }
475 } catch (IOException e) {
476 throw new SearchException(e);
477 }
478 }
479
480 return true;
481 } catch (SearchServiceDatabaseException e) {
482 logger.info("Could not delete media package with id {} from search index", mediaPackageId);
483 throw new SearchException(e);
484 }
485 }
486
487
488
489
490
491
492
493
494 public boolean deleteSeriesSynchronously(String seriesId) throws SearchException {
495 try {
496 logger.info("Marking {} as deleted in the search index", seriesId);
497 JsonElement json = gson.toJsonTree(Map.of(
498 "deleted", Instant.now().getEpochSecond(),
499 "modified", Instant.now().toString()));
500 var updateRequest = new UpdateRequest(INDEX_NAME, seriesId).doc(gson.toJson(json), XContentType.JSON);
501 try {
502 UpdateResponse response = esIndex.getClient().update(updateRequest, RequestOptions.DEFAULT);
503
504 return DocWriteResponse.Result.UPDATED == response.getResult();
505 } catch (ElasticsearchStatusException e) {
506 if (RestStatus.NOT_FOUND == e.status()) {
507 logger.debug("Attempted to delete {}, but that series does not exist in the index.", seriesId);
508 return true;
509 }
510 throw new SearchException(e);
511 }
512 } catch (IOException e) {
513 throw new SearchException("Could not delete series " + seriesId + " from index", e);
514 }
515 }
516
517 @Override
518 public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
519 final Organization originalOrg = securityService.getOrganization();
520 final User originalUser = securityService.getUser();
521
522 try {
523 int total = persistence.countMediaPackages();
524 int pageSize = 50;
525 int pageOffset = 0;
526 AtomicInteger current = new AtomicInteger(1);
527 logIndexRebuildBegin(logger, total, "search");
528 List<Tuple<MediaPackage, String>> page = null;
529
530 do {
531 page = persistence.getAllMediaPackages(pageSize, pageOffset).collect(Collectors.toList());
532 page.forEach(tuple -> {
533 try {
534 MediaPackage mediaPackage = tuple.getA();
535 Organization organization = organizationDirectory.getOrganization(tuple.getB());
536 final var systemUser = SecurityUtil.createSystemUser(systemUserName, organization);
537 securityService.setUser(systemUser);
538 securityService.setOrganization(organization);
539
540 String mediaPackageId = mediaPackage.getIdentifier().toString();
541
542 AccessControlList acl = persistence.getAccessControlList(mediaPackageId);
543 Date modificationDate = persistence.getModificationDate(mediaPackageId);
544 Date deletionDate = persistence.getDeletionDate(mediaPackageId);
545
546 current.getAndIncrement();
547
548 indexMediaPackage(mediaPackage, acl, modificationDate, deletionDate);
549 } catch (SearchServiceDatabaseException e) {
550 logIndexRebuildError(logger, total, current.get(), e);
551
552 throw new RuntimeException("Internal Index Rebuild Failure", e);
553 } catch (RuntimeException | NotFoundException e) {
554 logSkippingElement(logger, "event", tuple.getA().getIdentifier().toString(), e);
555 }
556 });
557
558 logIndexRebuildProgress(logger, total, current.get() - 1, pageSize);
559 pageOffset += 1;
560 } while (pageOffset * pageSize <= total);
561
562 } catch (SearchServiceDatabaseException | RuntimeException e) {
563 logIndexRebuildError(logger, e);
564 throw new IndexRebuildException("Index Rebuild Failure", e);
565 } finally {
566 securityService.setUser(originalUser);
567 securityService.setOrganization(originalOrg);
568 }
569 }
570
571 @Reference
572 public void setPersistence(SearchServiceDatabase persistence) {
573 this.persistence = persistence;
574 }
575
576 @Reference
577 public void setSeriesService(SeriesService seriesService) {
578 this.seriesService = seriesService;
579 }
580
581 @Reference
582 public void setWorkspace(Workspace workspace) {
583 this.workspace = workspace;
584 }
585
586 @Reference
587 public void setAuthorizationService(AuthorizationService authorizationService) {
588 this.authorizationService = authorizationService;
589 }
590
591
592
593
594
595
596
597 @Reference
598 public void setSecurityService(SecurityService securityService) {
599 this.securityService = securityService;
600 }
601
602
603
604
605
606
607
608 @Reference
609 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
610 this.organizationDirectory = organizationDirectory;
611 }
612
613 @Reference
614 public void setListProvidersService(ListProvidersService listProvidersService) {
615 this.listProvidersService = listProvidersService;
616 }
617 }