1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.search.impl;
23
24 import static org.opencastproject.security.api.Permissions.Action.WRITE;
25 import static org.opencastproject.security.util.SecurityUtil.getEpisodeRoleId;
26
27 import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
28 import org.opencastproject.elasticsearch.index.rebuild.AbstractIndexProducer;
29 import org.opencastproject.elasticsearch.index.rebuild.IndexProducer;
30 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildException;
31 import org.opencastproject.elasticsearch.index.rebuild.IndexRebuildService;
32 import org.opencastproject.list.api.ListProviderException;
33 import org.opencastproject.list.api.ListProvidersService;
34 import org.opencastproject.list.api.ResourceListQuery;
35 import org.opencastproject.list.impl.ResourceListQueryImpl;
36 import org.opencastproject.mediapackage.MediaPackage;
37 import org.opencastproject.metadata.dublincore.DublinCore;
38 import org.opencastproject.metadata.dublincore.DublinCoreCatalog;
39 import org.opencastproject.metadata.dublincore.DublinCoreUtil;
40 import org.opencastproject.metadata.dublincore.DublinCoreValue;
41 import org.opencastproject.metadata.dublincore.DublinCores;
42 import org.opencastproject.search.api.SearchException;
43 import org.opencastproject.search.api.SearchResult;
44 import org.opencastproject.search.api.SearchService;
45 import org.opencastproject.search.impl.persistence.SearchServiceDatabase;
46 import org.opencastproject.search.impl.persistence.SearchServiceDatabaseException;
47 import org.opencastproject.security.api.AccessControlEntry;
48 import org.opencastproject.security.api.AccessControlList;
49 import org.opencastproject.security.api.AccessControlUtil;
50 import org.opencastproject.security.api.AuthorizationService;
51 import org.opencastproject.security.api.Organization;
52 import org.opencastproject.security.api.OrganizationDirectoryService;
53 import org.opencastproject.security.api.SecurityService;
54 import org.opencastproject.security.api.UnauthorizedException;
55 import org.opencastproject.security.api.User;
56 import org.opencastproject.security.util.SecurityUtil;
57 import org.opencastproject.series.api.SeriesException;
58 import org.opencastproject.series.api.SeriesService;
59 import org.opencastproject.util.NotFoundException;
60 import org.opencastproject.util.data.Tuple;
61 import org.opencastproject.workspace.api.Workspace;
62
63 import com.google.gson.Gson;
64 import com.google.gson.JsonElement;
65
66 import org.apache.commons.io.IOUtils;
67 import org.elasticsearch.ElasticsearchStatusException;
68 import org.elasticsearch.action.DocWriteResponse;
69 import org.elasticsearch.action.index.IndexRequest;
70 import org.elasticsearch.action.search.SearchRequest;
71 import org.elasticsearch.action.search.SearchResponse;
72 import org.elasticsearch.action.update.UpdateRequest;
73 import org.elasticsearch.action.update.UpdateResponse;
74 import org.elasticsearch.client.RequestOptions;
75 import org.elasticsearch.client.indices.CreateIndexRequest;
76 import org.elasticsearch.common.xcontent.XContentType;
77 import org.elasticsearch.rest.RestStatus;
78 import org.elasticsearch.search.builder.SearchSourceBuilder;
79 import org.osgi.service.component.ComponentContext;
80 import org.osgi.service.component.annotations.Activate;
81 import org.osgi.service.component.annotations.Component;
82 import org.osgi.service.component.annotations.Reference;
83 import org.slf4j.Logger;
84 import org.slf4j.LoggerFactory;
85
86 import java.io.IOException;
87 import java.io.InputStream;
88 import java.nio.charset.StandardCharsets;
89 import java.time.Instant;
90 import java.time.format.DateTimeFormatter;
91 import java.util.Collections;
92 import java.util.Date;
93 import java.util.HashMap;
94 import java.util.HashSet;
95 import java.util.List;
96 import java.util.Map;
97 import java.util.Objects;
98 import java.util.Set;
99 import java.util.concurrent.atomic.AtomicInteger;
100 import java.util.stream.Collectors;
101
102
103
104
105 @Component(
106 immediate = true,
107 service = { SearchServiceIndex.class, IndexProducer.class },
108 property = {
109 "service.description=Search Service Index",
110 "service.pid=org.opencastproject.search.impl.SearchServiceIndex"
111 }
112 )
113 public final class SearchServiceIndex extends AbstractIndexProducer implements IndexProducer {
114
115 @Override
116 public IndexRebuildService.Service getService() {
117 return IndexRebuildService.Service.Search;
118 }
119
120
121 private static final Logger logger = LoggerFactory.getLogger(SearchServiceIndex.class);
122
123 public static final String INDEX_NAME = "opencast_search";
124
125 private final Gson gson = new Gson();
126
127 private ElasticsearchIndex esIndex;
128
129 private SeriesService seriesService;
130
131
132 private Workspace workspace;
133
134
135 private SecurityService securityService;
136
137
138 private AuthorizationService authorizationService;
139
140
141 private SearchServiceDatabase persistence;
142
143
144 private OrganizationDirectoryService organizationDirectory = null;
145
146 private ListProvidersService listProvidersService;
147
148 private String systemUserName = null;
149
150
151
152
153
154 public SearchServiceIndex() {
155 }
156
157
158
159
160
161
162
163 @Activate
164 public void activate(final ComponentContext cc) throws IllegalStateException {
165 createIndex();
166 systemUserName = SecurityUtil.getSystemUserName(cc);
167 }
168
169 private void createIndex() {
170 var mapping = "";
171 try (var in = this.getClass().getResourceAsStream("/search-mapping.json")) {
172 mapping = IOUtils.toString(in, StandardCharsets.UTF_8);
173 } catch (IOException e) {
174 throw new SearchException("Could not read mapping.", e);
175 }
176 try {
177 logger.debug("Trying to create index for '{}'", INDEX_NAME);
178 InputStream is = getClass().getResourceAsStream("/elasticsearch/indexSettings.json");
179 String indexSettings = IOUtils.toString(is, StandardCharsets.UTF_8);
180 final CreateIndexRequest request = new CreateIndexRequest(INDEX_NAME)
181 .settings(indexSettings, XContentType.JSON)
182 .mapping(mapping, XContentType.JSON);
183 var response = esIndex.getClient().indices().create(request, RequestOptions.DEFAULT);
184 if (!response.isAcknowledged()) {
185 throw new SearchException("Unable to create index for '" + INDEX_NAME + "'");
186 }
187 } catch (ElasticsearchStatusException e) {
188 if (e.getDetailedMessage().contains("already_exists_exception")) {
189 logger.info("Detected existing index '{}'", INDEX_NAME);
190 } else {
191 throw e;
192 }
193 } catch (IOException e) {
194 throw new SearchException(e);
195 }
196 }
197
198 @Reference
199 public void setEsIndex(ElasticsearchIndex esIndex) {
200 this.esIndex = esIndex;
201 }
202
203
204 public SearchResponse search(SearchSourceBuilder searchSource) throws SearchException {
205 SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
206 logger.debug("Sending for query: {}", searchSource.query());
207 searchRequest.source(searchSource);
208 try {
209 return esIndex.getClient().search(searchRequest, RequestOptions.DEFAULT);
210 } catch (IOException e) {
211 throw new SearchException(e);
212 }
213 }
214
215
216
217
218
219
220
221
222
223
224
225
226
227 public void addSynchronously(MediaPackage mediaPackage)
228 throws SearchException, IllegalArgumentException, UnauthorizedException, SearchServiceDatabaseException {
229 if (mediaPackage == null) {
230 throw new IllegalArgumentException("Unable to add a null mediapackage");
231 }
232 var mediaPackageId = mediaPackage.getIdentifier().toString();
233
234 checkSearchEntityWritePermission(mediaPackageId);
235
236 logger.debug("Attempting to add media package {} to search index", mediaPackageId);
237 final var acls = new AccessControlList[1];
238 final var org = securityService.getOrganization();
239 final var systemUser = SecurityUtil.createSystemUser(systemUserName, org);
240
241 SecurityUtil.runAs(securityService, org, systemUser, () -> {
242 acls[0] = authorizationService.getActiveAcl(mediaPackage).getA();
243 });
244 var acl = acls[0] == null ? new AccessControlList() : acls[0];
245 var now = new Date();
246
247 try {
248 persistence.storeMediaPackage(mediaPackage, acl, now);
249 } catch (SearchServiceDatabaseException e) {
250 throw new SearchException(String.format("Could not store media package to search database %s", mediaPackageId),
251 e);
252 }
253
254 indexMediaPackage(mediaPackage, acl);
255 }
256
257 public void indexMediaPackage(String mediaPackageId)
258 throws SearchException, SearchServiceDatabaseException, UnauthorizedException, NotFoundException {
259 if (!securityService.getUser().hasRole("ROLE_ADMIN")) {
260 throw new UnauthorizedException("Only global administrators may trigger manual event updates.");
261 }
262 try {
263 MediaPackage mp = persistence.getMediaPackage(mediaPackageId);
264 AccessControlList acl = persistence.getAccessControlList(mediaPackageId);
265 Date modificationDate = persistence.getModificationDate(mediaPackageId);
266 Date deletionDate = persistence.getDeletionDate(mediaPackageId);
267 indexMediaPackage(mp, acl, modificationDate, deletionDate);
268 } catch (RuntimeException e) {
269 logSkippingElement(logger, "event", mediaPackageId, e);
270 }
271 }
272
273 private void indexMediaPackage(MediaPackage mediaPackage, AccessControlList acl)
274 throws SearchException, SearchServiceDatabaseException {
275 indexMediaPackage(mediaPackage, acl, null, null);
276 }
277
278 private void indexMediaPackage(MediaPackage mediaPackage, AccessControlList acl, Date modDate, Date delDate)
279 throws SearchException, SearchServiceDatabaseException {
280 String mediaPackageId = mediaPackage.getIdentifier().toString();
281 String orgId = securityService.getOrganization().getId();
282
283
284 DublinCoreCatalog dc = null == delDate
285 ? DublinCoreUtil.loadEpisodeDublinCore(workspace, mediaPackage).orElse(DublinCores.mkSimple())
286 : DublinCores.mkSimple();
287
288 List<DublinCoreCatalog> seriesList = Collections.emptyList();
289 if (dc.hasValue(DublinCore.PROPERTY_IS_PART_OF)) {
290
291 seriesList = dc.get(DublinCore.PROPERTY_IS_PART_OF).stream().map(DublinCoreValue::getValue).map(s -> {
292 try {
293 return seriesService.getSeries(s);
294 } catch (NotFoundException e) {
295 logger.warn("Series {} not found during index of event {}, omitting the link from the indexed data", s,
296 mediaPackageId);
297 } catch (UnauthorizedException e) {
298 logger.warn("Not authorized for series {} during index of event {}, omitting the link from the indexed data",
299 s, mediaPackageId);
300 } catch (SeriesException e) {
301 throw new SearchException(e);
302 }
303 return null;
304 }).filter(Objects::nonNull).collect(Collectors.toList());
305 }
306
307
308 acl = addCustomAclRoles(mediaPackageId, acl);
309
310 SearchResult item = new SearchResult(SearchService.IndexEntryType.Episode, dc, acl, orgId, mediaPackage,
311 null != modDate ? modDate.toInstant() : Instant.now(),
312 null != delDate ? delDate.toInstant() : null);
313 Map<String, Object> metadata = item.dehydrateForIndex();
314 try {
315 var request = new IndexRequest(INDEX_NAME);
316 request.id(mediaPackageId);
317 request.source(metadata);
318 esIndex.getClient().index(request, RequestOptions.DEFAULT);
319 logger.debug("Indexed episode {}", mediaPackageId);
320 } catch (IOException e) {
321 throw new SearchException(e);
322 }
323
324
325 for (DublinCoreCatalog seriesDc : seriesList) {
326 String seriesId = seriesDc.getFirst(DublinCore.PROPERTY_IDENTIFIER);
327 AccessControlList seriesAcl = persistence.getAccessControlLists(seriesId, mediaPackageId).stream()
328 .map(aclPair -> addCustomAclRoles(aclPair.getKey(), aclPair.getValue()))
329 .reduce(new AccessControlList(acl.getEntries()), AccessControlList::mergeActions);
330 item = new SearchResult(SearchService.IndexEntryType.Series, seriesDc, seriesAcl, orgId,
331 null, Instant.now(), null);
332
333 Map<String, Object> seriesData = item.dehydrateForIndex();
334 try {
335 var request = new IndexRequest(INDEX_NAME);
336 request.id(seriesId);
337 request.source(seriesData);
338 esIndex.getClient().index(request, RequestOptions.DEFAULT);
339 logger.debug("Indexed series {} related to episode {}", seriesId, mediaPackageId);
340 } catch (IOException e) {
341 throw new SearchException(e);
342 }
343 }
344 }
345
346
347
348
349
350
351
352
353
354
355
356 private AccessControlList addCustomAclRoles(String mediaPackageId, AccessControlList acl) {
357
358 Set<AccessControlEntry> customEntries = new HashSet<>();
359 customEntries.add(new AccessControlEntry(getEpisodeRoleId(mediaPackageId, "READ"), "read", true));
360 customEntries.add(new AccessControlEntry(getEpisodeRoleId(mediaPackageId, "WRITE"), "write", true));
361
362 ResourceListQuery query = new ResourceListQueryImpl();
363 if (listProvidersService.hasProvider("ACL.ACTIONS")) {
364 Map<String, String> actions = new HashMap<>();
365 try {
366 actions = listProvidersService.getList("ACL.ACTIONS", query, true);
367 } catch (ListProviderException e) {
368 throw new SearchException("Listproviders not loaded. " + e);
369 }
370 for (String action : actions.keySet()) {
371 customEntries.add(
372 new AccessControlEntry(getEpisodeRoleId(mediaPackageId, action), action, true));
373 }
374 }
375
376 return acl;
377 }
378
379 private void checkSearchEntityWritePermission(final String mediaPackageId) throws SearchException {
380 User user = securityService.getUser();
381 try {
382 if (!persistence.isAvailable(mediaPackageId)) {
383 throw new NotFoundException();
384 }
385 AccessControlList acl = persistence.getAccessControlList(mediaPackageId);
386 if (!AccessControlUtil.isAuthorized(acl, user, securityService.getOrganization(), WRITE.toString(),
387 mediaPackageId)) {
388 throw new UnauthorizedException(user, "Write permission denied for " + mediaPackageId, acl);
389 }
390 } catch (NotFoundException e) {
391 logger.debug("Mediapackage {} does not exist or was deleted, allowing writes for user {}", mediaPackageId, user);
392 } catch (SearchServiceDatabaseException | UnauthorizedException e) {
393 throw new SearchException(e);
394 }
395 }
396
397
398
399
400
401
402
403
404
405
406 public boolean deleteSynchronously(final String mediaPackageId) throws SearchException {
407
408 checkSearchEntityWritePermission(mediaPackageId);
409
410 String deletionString = DateTimeFormatter.ISO_INSTANT.format(Instant.now());
411
412 try {
413 logger.info("Marking media package {} as deleted in search index", mediaPackageId);
414 JsonElement json = gson.toJsonTree(Map.of(
415 SearchResult.DELETED_DATE, deletionString,
416 SearchResult.MODIFIED_DATE, deletionString));
417 var updateRequst = new UpdateRequest(INDEX_NAME, mediaPackageId)
418 .doc(gson.toJson(json), XContentType.JSON);
419 esIndex.getClient().update(updateRequst, RequestOptions.DEFAULT);
420 } catch (ElasticsearchStatusException e) {
421 if (e.status().getStatus() != RestStatus.NOT_FOUND.getStatus()) {
422 throw e;
423 }
424 logger.warn("Event {} is not in the search index. Skipping deletion", mediaPackageId);
425 } catch (IOException e) {
426 throw new SearchException("Could not delete episode " + mediaPackageId + " from index", e);
427 }
428
429 try {
430 logger.info("Marking media package {} as deleted in search database", mediaPackageId);
431
432 String seriesId = null;
433 Date now = new Date();
434 try {
435 seriesId = persistence.getMediaPackage(mediaPackageId).getSeries();
436 persistence.deleteMediaPackage(mediaPackageId, now);
437 logger.info("Removed media package {} from search persistence", mediaPackageId);
438 } catch (NotFoundException e) {
439
440 logger.info("Could not find media package with id {} in persistence, but will try remove it from index anyway.",
441 mediaPackageId);
442 } catch (SearchServiceDatabaseException | UnauthorizedException e) {
443 throw new SearchException(
444 String.format("Could not delete media package with id %s from persistence storage", mediaPackageId), e);
445 }
446
447
448 if (seriesId != null) {
449 try {
450 if (!persistence.getSeries(seriesId).isEmpty()) {
451
452 final AccessControlList seriesAcl = persistence.getAccessControlLists(seriesId).stream()
453 .map(aclPair -> addCustomAclRoles(aclPair.getKey(), aclPair.getValue()))
454 .reduce(new AccessControlList(), AccessControlList::mergeActions);
455 JsonElement json = gson.toJsonTree(Map.of(
456 SearchResult.INDEX_ACL, SearchResult.dehydrateAclForIndex(seriesAcl),
457 SearchResult.MODIFIED_DATE, deletionString));
458 var updateRequest = new UpdateRequest(INDEX_NAME, seriesId).doc(gson.toJson(json), XContentType.JSON);
459 try {
460 esIndex.getClient().update(updateRequest, RequestOptions.DEFAULT);
461 } catch (ElasticsearchStatusException e) {
462 if (RestStatus.NOT_FOUND == e.status()) {
463 logger.warn("Attempted to modify {}, but that series does not exist in the index.", seriesId);
464 }
465 }
466 } else {
467
468 deleteSeriesSynchronously(seriesId);
469 }
470 } catch (IOException e) {
471 throw new SearchException(e);
472 }
473 }
474
475 return true;
476 } catch (SearchServiceDatabaseException e) {
477 logger.info("Could not delete media package with id {} from search index", mediaPackageId);
478 throw new SearchException(e);
479 }
480 }
481
482
483
484
485
486
487
488
489 public boolean deleteSeriesSynchronously(String seriesId) throws SearchException {
490 try {
491 logger.info("Marking {} as deleted in the search index", seriesId);
492 JsonElement json = gson.toJsonTree(Map.of(
493 "deleted", Instant.now().getEpochSecond(),
494 "modified", Instant.now().toString()));
495 var updateRequest = new UpdateRequest(INDEX_NAME, seriesId).doc(gson.toJson(json), XContentType.JSON);
496 try {
497 UpdateResponse response = esIndex.getClient().update(updateRequest, RequestOptions.DEFAULT);
498
499 return DocWriteResponse.Result.UPDATED == response.getResult();
500 } catch (ElasticsearchStatusException e) {
501 if (RestStatus.NOT_FOUND == e.status()) {
502 logger.debug("Attempted to delete {}, but that series does not exist in the index.", seriesId);
503 return true;
504 }
505 throw new SearchException(e);
506 }
507 } catch (IOException e) {
508 throw new SearchException("Could not delete series " + seriesId + " from index", e);
509 }
510 }
511
512 @Override
513 public void repopulate(IndexRebuildService.DataType type) throws IndexRebuildException {
514 final Organization originalOrg = securityService.getOrganization();
515 final User originalUser = securityService.getUser();
516
517 try {
518 int total = persistence.countMediaPackages();
519 int pageSize = 50;
520 int pageOffset = 0;
521 AtomicInteger current = new AtomicInteger(1);
522 logIndexRebuildBegin(logger, total, "search");
523 List<Tuple<MediaPackage, String>> page = null;
524
525 do {
526 page = persistence.getAllMediaPackages(pageSize, pageOffset).collect(Collectors.toList());
527 page.forEach(tuple -> {
528 try {
529 MediaPackage mediaPackage = tuple.getA();
530 Organization organization = organizationDirectory.getOrganization(tuple.getB());
531 final var systemUser = SecurityUtil.createSystemUser(systemUserName, organization);
532 securityService.setUser(systemUser);
533 securityService.setOrganization(organization);
534
535 String mediaPackageId = mediaPackage.getIdentifier().toString();
536
537 AccessControlList acl = persistence.getAccessControlList(mediaPackageId);
538 Date modificationDate = persistence.getModificationDate(mediaPackageId);
539 Date deletionDate = persistence.getDeletionDate(mediaPackageId);
540
541 current.getAndIncrement();
542
543 indexMediaPackage(mediaPackage, acl, modificationDate, deletionDate);
544 } catch (SearchServiceDatabaseException e) {
545 logIndexRebuildError(logger, total, current.get(), e);
546
547 throw new RuntimeException("Internal Index Rebuild Failure", e);
548 } catch (RuntimeException | NotFoundException e) {
549 logSkippingElement(logger, "event", tuple.getA().getIdentifier().toString(), e);
550 }
551 });
552
553 logIndexRebuildProgress(logger, total, current.get() - 1, pageSize);
554 pageOffset += 1;
555 } while (pageOffset * pageSize <= total);
556
557 } catch (SearchServiceDatabaseException | RuntimeException e) {
558 logIndexRebuildError(logger, e);
559 throw new IndexRebuildException("Index Rebuild Failure", e);
560 } finally {
561 securityService.setUser(originalUser);
562 securityService.setOrganization(originalOrg);
563 }
564 }
565
566 @Reference
567 public void setPersistence(SearchServiceDatabase persistence) {
568 this.persistence = persistence;
569 }
570
571 @Reference
572 public void setSeriesService(SeriesService seriesService) {
573 this.seriesService = seriesService;
574 }
575
576 @Reference
577 public void setWorkspace(Workspace workspace) {
578 this.workspace = workspace;
579 }
580
581 @Reference
582 public void setAuthorizationService(AuthorizationService authorizationService) {
583 this.authorizationService = authorizationService;
584 }
585
586
587
588
589
590
591
592 @Reference
593 public void setSecurityService(SecurityService securityService) {
594 this.securityService = securityService;
595 }
596
597
598
599
600
601
602
603 @Reference
604 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectory) {
605 this.organizationDirectory = organizationDirectory;
606 }
607
608 @Reference
609 public void setListProvidersService(ListProvidersService listProvidersService) {
610 this.listProvidersService = listProvidersService;
611 }
612 }