1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.publication.oaipmh;
22
23 import static java.lang.String.format;
24 import static org.opencastproject.util.JobUtil.waitForJobs;
25
26 import org.opencastproject.distribution.api.DistributionException;
27 import org.opencastproject.distribution.api.DownloadDistributionService;
28 import org.opencastproject.distribution.api.StreamingDistributionService;
29 import org.opencastproject.job.api.AbstractJobProducer;
30 import org.opencastproject.job.api.Job;
31 import org.opencastproject.mediapackage.MediaPackage;
32 import org.opencastproject.mediapackage.MediaPackageElement;
33 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
34 import org.opencastproject.mediapackage.MediaPackageElementParser;
35 import org.opencastproject.mediapackage.MediaPackageException;
36 import org.opencastproject.mediapackage.MediaPackageParser;
37 import org.opencastproject.mediapackage.MediaPackageReference;
38 import org.opencastproject.mediapackage.MediaPackageRuntimeException;
39 import org.opencastproject.mediapackage.MediaPackageSupport;
40 import org.opencastproject.mediapackage.Publication;
41 import org.opencastproject.mediapackage.PublicationImpl;
42 import org.opencastproject.mediapackage.selector.SimpleElementSelector;
43 import org.opencastproject.oaipmh.persistence.OaiPmhDatabase;
44 import org.opencastproject.oaipmh.persistence.OaiPmhDatabaseException;
45 import org.opencastproject.oaipmh.persistence.QueryBuilder;
46 import org.opencastproject.oaipmh.persistence.SearchResult;
47 import org.opencastproject.oaipmh.persistence.SearchResultItem;
48 import org.opencastproject.oaipmh.server.OaiPmhServerInfo;
49 import org.opencastproject.oaipmh.server.OaiPmhServerInfoUtil;
50 import org.opencastproject.publication.api.OaiPmhPublicationService;
51 import org.opencastproject.publication.api.PublicationException;
52 import org.opencastproject.security.api.OrganizationDirectoryService;
53 import org.opencastproject.security.api.SecurityService;
54 import org.opencastproject.security.api.UserDirectoryService;
55 import org.opencastproject.serviceregistry.api.ServiceRegistry;
56 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
57 import org.opencastproject.util.MimeTypes;
58 import org.opencastproject.util.NotFoundException;
59 import org.opencastproject.util.UrlSupport;
60 import org.opencastproject.util.data.Collections;
61
62 import org.apache.commons.lang3.BooleanUtils;
63 import org.apache.commons.lang3.StringUtils;
64 import org.apache.http.client.utils.URIUtils;
65 import org.osgi.service.component.annotations.Component;
66 import org.osgi.service.component.annotations.Reference;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 import java.net.URI;
71 import java.util.ArrayList;
72 import java.util.Arrays;
73 import java.util.HashSet;
74 import java.util.Hashtable;
75 import java.util.List;
76 import java.util.Map;
77 import java.util.Optional;
78 import java.util.Set;
79 import java.util.UUID;
80 import java.util.stream.Collectors;
81 import java.util.stream.Stream;
82
83
84
85
86 @Component(
87 immediate = true,
88 service = OaiPmhPublicationService.class,
89 property = {
90 "service.description=Publication Service (OAI-PMH)"
91 }
92 )
93 public class OaiPmhPublicationServiceImpl extends AbstractJobProducer implements OaiPmhPublicationService {
94
95
96 private static final Logger logger = LoggerFactory.getLogger(OaiPmhPublicationServiceImpl.class);
97
98 public enum Operation {
99 Publish, Retract, UpdateMetadata, Replace
100 }
101
102 private DownloadDistributionService downloadDistributionService;
103 private OaiPmhServerInfo oaiPmhServerInfo;
104 private OaiPmhDatabase oaiPmhDatabase;
105 private OrganizationDirectoryService organizationDirectoryService;
106 private SecurityService securityService;
107 private ServiceRegistry serviceRegistry;
108 private StreamingDistributionService streamingDistributionService;
109 private UserDirectoryService userDirectoryService;
110
111 public OaiPmhPublicationServiceImpl() {
112 super(JOB_TYPE);
113 }
114
115 @Override
116 protected String process(Job job) throws Exception {
117 if (!StringUtils.equalsIgnoreCase(JOB_TYPE, job.getJobType())) {
118 throw new IllegalArgumentException("Can not handle job type " + job.getJobType());
119 }
120
121 Publication publication = null;
122 MediaPackage mediaPackage = MediaPackageParser.getFromXml(job.getArguments().get(0));
123 String repository = job.getArguments().get(1);
124 boolean checkAvailability = false;
125 switch (Operation.valueOf(job.getOperation())) {
126 case Publish:
127 String[] downloadElementIds = StringUtils.split(job.getArguments().get(2), SEPARATOR);
128 String[] streamingElementIds = StringUtils.split(job.getArguments().get(3), SEPARATOR);
129 checkAvailability = BooleanUtils.toBoolean(job.getArguments().get(4));
130 publication = publish(job, mediaPackage, repository,
131 Collections.set(downloadElementIds), Collections.set(streamingElementIds), checkAvailability);
132 break;
133 case Replace:
134 final Set<? extends MediaPackageElement> downloadElements =
135 Collections.toSet(MediaPackageElementParser.getArrayFromXml(job.getArguments().get(2)));
136 final Set<? extends MediaPackageElement> streamingElements =
137 Collections.toSet(MediaPackageElementParser.getArrayFromXml(job.getArguments().get(3)));
138 final Set<MediaPackageElementFlavor> retractDownloadFlavors = Arrays.stream(
139 StringUtils.split(job.getArguments().get(4), SEPARATOR))
140 .filter(s -> !s.isEmpty())
141 .map(MediaPackageElementFlavor::parseFlavor)
142 .collect(Collectors.toSet());
143 final Set<MediaPackageElementFlavor> retractStreamingFlavors = Arrays.stream(
144 StringUtils.split(job.getArguments().get(5), SEPARATOR))
145 .filter(s -> !s.isEmpty())
146 .map(MediaPackageElementFlavor::parseFlavor)
147 .collect(Collectors.toSet());
148 final Set<? extends MediaPackageElement> publications =
149 Collections.toSet(MediaPackageElementParser.getArrayFromXml(job.getArguments().get(6)));
150 checkAvailability = BooleanUtils.toBoolean(job.getArguments().get(7));
151 publication = replace(job, mediaPackage, repository, downloadElements, streamingElements,
152 retractDownloadFlavors, retractStreamingFlavors, publications, checkAvailability);
153 break;
154 case Retract:
155 publication = retract(job, mediaPackage, repository);
156 break;
157 case UpdateMetadata:
158 checkAvailability = BooleanUtils.toBoolean(job.getArguments().get(4));
159 String[] flavors = StringUtils.split(job.getArguments().get(2), SEPARATOR);
160 String[] tags = StringUtils.split(job.getArguments().get(3), SEPARATOR);
161 publication = updateMetadata(job, mediaPackage, repository,
162 Collections.set(flavors), Collections.set(tags), checkAvailability);
163 break;
164 default:
165 throw new IllegalArgumentException("Can not handle this type of operation: " + job.getOperation());
166 }
167 return publication != null ? MediaPackageElementParser.getAsXml(publication) : null;
168 }
169
170 @Override
171 public Job replace(
172 MediaPackage mediaPackage,
173 String repository,
174 Set<? extends MediaPackageElement> downloadElements,
175 Set<? extends MediaPackageElement> streamingElements,
176 Set<MediaPackageElementFlavor> retractDownloadFlavors,
177 Set<MediaPackageElementFlavor> retractStreamingFlavors,
178 Set<? extends Publication> publications,
179 boolean checkAvailability
180 ) throws PublicationException {
181 checkInputArguments(mediaPackage, repository);
182 try {
183 return serviceRegistry.createJob(JOB_TYPE, Operation.Replace.name(),
184 Arrays.asList(MediaPackageParser.getAsXml(mediaPackage),
185 repository,
186 MediaPackageElementParser.getArrayAsXml(Collections.toList(downloadElements)),
187 MediaPackageElementParser.getArrayAsXml(Collections.toList(streamingElements)),
188 StringUtils.join(retractDownloadFlavors, SEPARATOR),
189 StringUtils.join(retractStreamingFlavors, SEPARATOR),
190 MediaPackageElementParser.getArrayAsXml(Collections.toList(publications)),
191 Boolean.toString(checkAvailability)));
192 } catch (ServiceRegistryException e) {
193 throw new PublicationException("Unable to create job", e);
194 } catch (MediaPackageException e) {
195 throw new PublicationException("Unable to serialize media package elements", e);
196 }
197 }
198
199 @Override
200 public Publication replaceSync(
201 MediaPackage mediaPackage, String repository, Set<? extends MediaPackageElement> downloadElements,
202 Set<? extends MediaPackageElement> streamingElements, Set<MediaPackageElementFlavor> retractDownloadFlavors,
203 Set<MediaPackageElementFlavor> retractStreamingFlavors, Set<? extends Publication> publications,
204 boolean checkAvailability) throws PublicationException, MediaPackageException {
205 return replace(null, mediaPackage, repository, downloadElements, streamingElements, retractDownloadFlavors,
206 retractStreamingFlavors, publications, checkAvailability);
207 }
208
209 @Override
210 public Job publish(MediaPackage mediaPackage, String repository, Set<String> downloadElementIds,
211 Set<String> streamingElementIds, boolean checkAvailability)
212 throws PublicationException, MediaPackageException {
213 checkInputArguments(mediaPackage, repository);
214
215 try {
216 return serviceRegistry.createJob(JOB_TYPE, Operation.Publish.toString(),
217 Arrays.asList(MediaPackageParser.getAsXml(mediaPackage),
218 repository,
219 StringUtils.join(downloadElementIds, SEPARATOR),
220 StringUtils.join(streamingElementIds, SEPARATOR),
221 Boolean.toString(checkAvailability)));
222 } catch (ServiceRegistryException e) {
223 throw new PublicationException("Unable to create job", e);
224 }
225 }
226
227 @Override
228 public Job retract(MediaPackage mediaPackage, String repository) throws PublicationException, NotFoundException {
229 checkInputArguments(mediaPackage, repository);
230
231 try {
232 return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
233 Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), repository));
234 } catch (ServiceRegistryException e) {
235 throw new PublicationException("Unable to create job", e);
236 }
237 }
238
239 @Override
240 public Job updateMetadata(MediaPackage mediaPackage, String repository, Set<String> flavors, Set<String> tags,
241 boolean checkAvailability) throws PublicationException, MediaPackageException {
242 checkInputArguments(mediaPackage, repository);
243 if ((flavors == null || flavors.isEmpty()) && (tags == null || tags.isEmpty())) {
244 throw new IllegalArgumentException("Flavors or tags must be set");
245 }
246
247 try {
248 return serviceRegistry.createJob(JOB_TYPE, Operation.UpdateMetadata.toString(),
249 Arrays.asList(MediaPackageParser.getAsXml(mediaPackage),
250 repository,
251 StringUtils.join(flavors, SEPARATOR),
252 StringUtils.join(tags, SEPARATOR),
253 Boolean.toString(checkAvailability)));
254 } catch (ServiceRegistryException e) {
255 throw new PublicationException("Unable to create job", e);
256 }
257 }
258
259 protected Publication publish(Job job, MediaPackage mediaPackage, String repository, Set<String> downloadElementIds,
260 Set<String> streamingElementIds, boolean checkAvailability)
261 throws PublicationException, MediaPackageException {
262 String mpId = mediaPackage.getIdentifier().toString();
263 SearchResult searchResult = oaiPmhDatabase.search(QueryBuilder.queryRepo(repository).mediaPackageId(mpId)
264 .isDeleted(false).build());
265
266 if (searchResult.size() > 0) {
267 try {
268 Publication p = retract(job, mediaPackage, repository);
269 if (p != null && mediaPackage.contains(p)) {
270 mediaPackage.remove(p);
271 }
272 } catch (NotFoundException e) {
273 logger.debug("No OAI-PMH publication found for media package {}.", mpId, e);
274
275 }
276 }
277 List<Job> distributionJobs = new ArrayList<>(2);
278 if (downloadElementIds != null && !downloadElementIds.isEmpty()) {
279
280 MediaPackage mpDownloadDist = (MediaPackage) mediaPackage.clone();
281 for (MediaPackageElement mpe : mpDownloadDist.getElements()) {
282 if (downloadElementIds.contains(mpe.getIdentifier())) {
283 continue;
284 }
285 mpDownloadDist.remove(mpe);
286 }
287
288 if (mpDownloadDist.getElements().length > 0) {
289 try {
290 Job downloadDistributionJob = downloadDistributionService
291 .distribute(getPublicationChannelName(repository), mpDownloadDist, downloadElementIds,
292 checkAvailability);
293 if (downloadDistributionJob != null) {
294 distributionJobs.add(downloadDistributionJob);
295 }
296 } catch (DistributionException e) {
297 throw new PublicationException(
298 format("Unable to distribute media package %s to download distribution.", mpId),
299 e);
300 }
301 }
302 }
303 if (streamingElementIds != null && !streamingElementIds.isEmpty()) {
304
305 MediaPackage mpStreamingDist = (MediaPackage) mediaPackage.clone();
306 for (MediaPackageElement mpe : mpStreamingDist.getElements()) {
307 if (streamingElementIds.contains(mpe.getIdentifier())) {
308 continue;
309 }
310 mpStreamingDist.remove(mpe);
311 }
312
313 if (mpStreamingDist.getElements().length > 0) {
314 try {
315 Job streamingDistributionJob = streamingDistributionService
316 .distribute(getPublicationChannelName(repository), mpStreamingDist, streamingElementIds);
317 if (streamingDistributionJob != null) {
318 distributionJobs.add(streamingDistributionJob);
319 }
320 } catch (DistributionException e) {
321 throw new PublicationException(
322 format("Unable to distribute media package %s to streaming distribution.", mpId),
323 e);
324 }
325 }
326 }
327 if (distributionJobs.isEmpty()) {
328 throw new IllegalStateException(format(
329 "The media package %s does not contain any elements for publishing to OAI-PMH", mpId));
330 }
331
332 if (!waitForJobs(job, serviceRegistry, distributionJobs).isSuccess()) {
333 throw new PublicationException(format(
334 "Unable to distribute elements of media package %s to distribution channels.", mpId));
335 }
336
337 List<MediaPackageElement> distributedElements = new ArrayList<>();
338 for (Job distributionJob : distributionJobs) {
339 String distributedElementsXml = distributionJob.getPayload();
340 if (StringUtils.isNotBlank(distributedElementsXml)) {
341 for (MediaPackageElement distributedElement
342 : MediaPackageElementParser.getArrayFromXml(distributedElementsXml)) {
343 distributedElements.add(distributedElement);
344 }
345 }
346 }
347 MediaPackage oaiPmhDistMp = (MediaPackage) mediaPackage.clone();
348
349 for (MediaPackageElement mpe : oaiPmhDistMp.getElements()) {
350
351 if (MediaPackageElement.Type.Publication == mpe.getElementType()) {
352 continue;
353 }
354 oaiPmhDistMp.remove(mpe);
355 }
356
357 for (MediaPackageElement mpe : distributedElements) {
358 oaiPmhDistMp.add(mpe);
359 }
360
361
362 try {
363 oaiPmhDatabase.store(oaiPmhDistMp, repository);
364 } catch (OaiPmhDatabaseException e) {
365
366 throw new PublicationException(
367 format("Unable to distribute media package %s to OAI-PMH repository %s", mpId, repository),
368 e);
369 }
370 return createPublicationElement(mpId, repository);
371 }
372
373 private Publication replace(
374 Job job,
375 MediaPackage mediaPackage,
376 String repository,
377 Set<? extends MediaPackageElement> downloadElements,
378 Set<? extends MediaPackageElement> streamingElements,
379 Set<MediaPackageElementFlavor> retractDownloadFlavors,
380 Set<MediaPackageElementFlavor> retractStreamingFlavors,
381 Set<? extends MediaPackageElement> publications,
382 boolean checkAvailable
383 ) throws MediaPackageException, PublicationException {
384 final String mpId = mediaPackage.getIdentifier().toString();
385 final String channel = getPublicationChannelName(repository);
386
387 try {
388 final SearchResult search = oaiPmhDatabase.search(QueryBuilder.queryRepo(repository).mediaPackageId(mpId)
389 .isDeleted(false).build());
390 if (search.size() > 1) {
391 throw new PublicationException("Found multiple OAI-PMH records for id " + mpId);
392 }
393 final Optional<MediaPackage> existingMp = search.getItems().stream().findFirst().map(
394 SearchResultItem::getMediaPackage);
395
396
397 final Set<String> addDownloadElementIds = downloadElements.stream()
398 .map(MediaPackageElement::getIdentifier)
399 .collect(Collectors.toSet());
400 final Set<String> addStreamingElementIds = streamingElements.stream()
401 .map(MediaPackageElement::getIdentifier)
402 .collect(Collectors.toSet());
403
404
405 final Set<MediaPackageElement> removeDownloadElements = existingMp.map(mp ->
406 Arrays.stream(mp.getElements())
407 .filter(e -> retractDownloadFlavors.stream().anyMatch(f -> f.matches(e.getFlavor())))
408 .collect(Collectors.toSet())
409 ).orElse(java.util.Collections.emptySet());
410 final Set<MediaPackageElement> removeStreamingElements = existingMp.map(mp ->
411 Arrays.stream(mp.getElements())
412 .filter(e -> retractStreamingFlavors.stream().anyMatch(f -> f.matches(e.getFlavor())))
413 .collect(Collectors.toSet())
414 ).orElse(java.util.Collections.emptySet());
415
416
417 final Set<String> removeDownloadElementIds = Stream
418 .concat(removeDownloadElements.stream(), downloadElements.stream())
419 .map(MediaPackageElement::getIdentifier)
420 .collect(Collectors.toSet());
421 final Set<String> removeStreamingElementIds = Stream
422 .concat(removeStreamingElements.stream(), streamingElements.stream())
423 .map(MediaPackageElement::getIdentifier)
424 .collect(Collectors.toSet());
425
426 if (removeDownloadElementIds.isEmpty() && removeStreamingElementIds.isEmpty()
427 && addDownloadElementIds.isEmpty() && addStreamingElementIds.isEmpty()) {
428
429 return Arrays.stream(mediaPackage.getPublications())
430 .filter(p -> channel.equals(p.getChannel()))
431 .findFirst()
432 .orElse(null);
433 }
434
435 final MediaPackage temporaryMediaPackage = (MediaPackage) mediaPackage.clone();
436 downloadElements.forEach(temporaryMediaPackage::add);
437 streamingElements.forEach(temporaryMediaPackage::add);
438 removeDownloadElements.forEach(temporaryMediaPackage::add);
439 removeStreamingElements.forEach(temporaryMediaPackage::add);
440
441 final List<MediaPackageElement> retractedElements = new ArrayList<>();
442 final List<MediaPackageElement> distributedElements = new ArrayList<>();
443 if (job != null) {
444 retractedElements
445 .addAll(retract(job, channel, temporaryMediaPackage, removeDownloadElementIds, removeStreamingElementIds));
446 distributedElements
447 .addAll(distribute(job, channel, temporaryMediaPackage, addDownloadElementIds, addStreamingElementIds,
448 checkAvailable));
449 } else {
450 retractedElements
451 .addAll(retractSync(channel, temporaryMediaPackage, removeDownloadElementIds, removeStreamingElementIds));
452 distributedElements
453 .addAll(distributeSync(channel, temporaryMediaPackage, addDownloadElementIds, addStreamingElementIds,
454 checkAvailable));
455 }
456
457 final MediaPackage oaiPmhDistMp = (MediaPackage) existingMp.orElse(mediaPackage).clone();
458
459
460 Arrays.stream(oaiPmhDistMp.getPublications())
461 .filter(p -> channel.equals(p.getChannel()))
462 .forEach(oaiPmhDistMp::remove);
463
464
465 retractedElements.stream()
466 .map(MediaPackageElement::getIdentifier)
467 .forEach(oaiPmhDistMp::removeElementById);
468
469 distributedElements.forEach(oaiPmhDistMp::add);
470
471
472 publications.stream()
473 .map(p -> ((Publication) p).getChannel())
474 .forEach(c -> Arrays.stream(oaiPmhDistMp.getPublications())
475 .filter(p -> c.equals(p.getChannel()))
476 .forEach(oaiPmhDistMp::remove));
477
478
479 publications.forEach(oaiPmhDistMp::add);
480
481
482 oaiPmhDatabase.store(oaiPmhDistMp, repository);
483
484 return Arrays.stream(mediaPackage.getPublications())
485 .filter(p -> channel.equals(p.getChannel()))
486 .findFirst()
487 .orElse(createPublicationElement(mpId, repository));
488 } catch (OaiPmhDatabaseException e) {
489 throw new PublicationException(format("Unable to update media package %s in OAI-PMH repository %s", mpId,
490 repository), e);
491 } catch (DistributionException e) {
492 throw new PublicationException(format("Unable to update OAI-PMH distributions of media package %s.", mpId), e);
493 } catch (MediaPackageRuntimeException e) {
494 throw e.getWrappedException();
495 }
496 }
497
498 private List<MediaPackageElement> retract(
499 Job job, String channel, MediaPackage mp, Set<String> removeDownloadElementIds,
500 Set<String> removeStreamingElementIds) throws PublicationException, DistributionException {
501 final List<Job> retractJobs = new ArrayList<>(2);
502 if (!removeDownloadElementIds.isEmpty()) {
503 retractJobs.add(downloadDistributionService.retract(channel, mp, removeDownloadElementIds));
504 }
505 if (!removeStreamingElementIds.isEmpty()) {
506 retractJobs.add(streamingDistributionService.retract(channel, mp, removeStreamingElementIds));
507 }
508
509
510 if (!waitForJobs(job, serviceRegistry, retractJobs).isSuccess()) {
511 throw new PublicationException(format("Unable to retract OAI-PMH distributions of media package %s",
512 mp.getIdentifier().toString()));
513 }
514
515 return retractJobs.stream()
516 .filter(j -> StringUtils.isNotBlank(j.getPayload()))
517 .map(Job::getPayload)
518 .flatMap(p -> MediaPackageElementParser.getArrayFromXmlUnchecked(p).stream())
519 .collect(Collectors.toList());
520 }
521
522 private List<MediaPackageElement> retractSync(String channel, MediaPackage mp, Set<String> removeDownloadElementIds,
523 Set<String> removeStreamingElementIds) throws DistributionException {
524 final List<MediaPackageElement> retracted = new ArrayList<>();
525 if (!removeDownloadElementIds.isEmpty()) {
526 retracted.addAll(downloadDistributionService.retractSync(channel, mp, removeDownloadElementIds));
527 }
528 if (!removeStreamingElementIds.isEmpty()) {
529 retracted.addAll(streamingDistributionService.retractSync(channel, mp, removeStreamingElementIds));
530 }
531 return retracted;
532 }
533
534 private List<MediaPackageElement> distribute(
535 Job job,
536 String channel,
537 MediaPackage mp,
538 Set<String> addDownloadElementIds,
539 Set<String> addStreamingElementIds,
540 boolean checkAvailable
541 ) throws PublicationException, MediaPackageException, DistributionException {
542 final List<Job> distributeJobs = new ArrayList<>(2);
543 if (!addDownloadElementIds.isEmpty()) {
544 distributeJobs.add(downloadDistributionService.distribute(channel, mp, addDownloadElementIds,
545 checkAvailable));
546 }
547 if (!addStreamingElementIds.isEmpty()) {
548 distributeJobs.add(streamingDistributionService.distribute(channel, mp, addStreamingElementIds));
549 }
550
551
552 if (!waitForJobs(job, serviceRegistry, distributeJobs).isSuccess()) {
553 throw new PublicationException(format("Unable to distribute OAI-PMH distributions of media package %s",
554 mp.getIdentifier().toString()));
555 }
556
557 return distributeJobs.stream()
558 .filter(j -> StringUtils.isNotBlank(j.getPayload()))
559 .map(Job::getPayload)
560 .flatMap(p -> MediaPackageElementParser.getArrayFromXmlUnchecked(p).stream())
561 .collect(Collectors.toList());
562 }
563
564 private List<MediaPackageElement> distributeSync(
565 String channel, MediaPackage mp, Set<String> addDownloadElementIds, Set<String> addStreamingElementIds,
566 boolean checkAvailable) throws DistributionException {
567 final List<MediaPackageElement> distributed = new ArrayList<>();
568 if (!addDownloadElementIds.isEmpty()) {
569 distributed.addAll(downloadDistributionService.distributeSync(channel, mp, addDownloadElementIds,
570 checkAvailable));
571 }
572 if (!addStreamingElementIds.isEmpty()) {
573 distributed.addAll(streamingDistributionService.distributeSync(channel, mp, addStreamingElementIds));
574 }
575 return distributed;
576 }
577
578
579 protected Publication retract(Job job, MediaPackage mediaPackage, String repository)
580 throws PublicationException, NotFoundException {
581 String mpId = mediaPackage.getIdentifier().toString();
582
583
584 MediaPackage oaiPmhMp = null;
585 SearchResult searchResult = oaiPmhDatabase.search(QueryBuilder.queryRepo(repository).mediaPackageId(mpId)
586 .isDeleted(false).build());
587 for (SearchResultItem searchResultItem : searchResult.getItems()) {
588 if (oaiPmhMp == null) {
589 oaiPmhMp = searchResultItem.getMediaPackage();
590 } else {
591 for (MediaPackageElement mpe : searchResultItem.getMediaPackage().getElements()) {
592 oaiPmhMp.add(mpe);
593 }
594 }
595 }
596
597
598 try {
599 oaiPmhDatabase.delete(mpId, repository);
600 } catch (OaiPmhDatabaseException e) {
601 throw new PublicationException(format("Unable to retract media package %s from OAI-PMH repository %s",
602 mpId, repository), e);
603 } catch (NotFoundException e) {
604 logger.debug("Skip retracting media package {} from OIA-PMH repository {} as it isn't published.",
605 mpId, repository, e);
606 }
607
608 if (oaiPmhMp != null && oaiPmhMp.getElements().length > 0) {
609
610 Set<String> mpeIds = new HashSet<>();
611 for (MediaPackageElement mpe : oaiPmhMp.elements()) {
612 if (MediaPackageElement.Type.Publication == mpe.getElementType()) {
613 continue;
614 }
615
616 mpeIds.add(mpe.getIdentifier());
617 }
618 if (!mpeIds.isEmpty()) {
619 List<Job> retractionJobs = new ArrayList<>();
620
621 try {
622 Job retractDownloadJob = downloadDistributionService
623 .retract(getPublicationChannelName(repository), oaiPmhMp, mpeIds);
624 if (retractDownloadJob != null) {
625 retractionJobs.add(retractDownloadJob);
626 }
627 } catch (DistributionException e) {
628 throw new PublicationException(
629 format(
630 "Unable to create retraction job from distribution channel download for the media package %s ",
631 mpId),
632 e);
633 }
634
635
636 try {
637 Job retractDownloadJob = streamingDistributionService
638 .retract(getPublicationChannelName(repository), oaiPmhMp, mpeIds);
639 if (retractDownloadJob != null) {
640 retractionJobs.add(retractDownloadJob);
641 }
642 } catch (DistributionException e) {
643 throw new PublicationException(
644 format("Unable to create retraction job from distribution channel streaming for the media package %s ",
645 mpId),
646 e);
647 }
648 if (retractionJobs.size() > 0) {
649
650 if (!waitForJobs(job, serviceRegistry, retractionJobs).isSuccess()) {
651 throw new PublicationException(
652 format("Unable to retract elements of media package %s from distribution channels.", mpId));
653 }
654 }
655 }
656 }
657
658 String publicationChannel = getPublicationChannelName(repository);
659 for (Publication p : mediaPackage.getPublications()) {
660 if (StringUtils.equals(publicationChannel, p.getChannel())) {
661 return p;
662 }
663 }
664 return null;
665 }
666
667 protected Publication updateMetadata(
668 Job job,
669 MediaPackage mediaPackage,
670 String repository,
671 Set<String> flavors,
672 Set<String> tags,
673 boolean checkAvailability
674 ) throws PublicationException {
675 final Set<MediaPackageElementFlavor> parsedFlavors = new HashSet<>();
676 for (String flavor : flavors) {
677 parsedFlavors.add(MediaPackageElementFlavor.parseFlavor(flavor));
678 }
679
680 final MediaPackage filteredMp;
681 final SearchResult result = oaiPmhDatabase.search(
682 QueryBuilder.queryRepo(repository)
683 .mediaPackageId(mediaPackage)
684 .isDeleted(false)
685 .build()
686 );
687 if (result.size() == 1) {
688
689 try {
690 logger.debug("filter elements with flavors {} and tags {} on media package {}",
691 StringUtils.join(flavors, ", "), StringUtils.join(tags, ", "),
692 MediaPackageParser.getAsXml(mediaPackage));
693
694 filteredMp = filterMediaPackage(mediaPackage, parsedFlavors, tags);
695 } catch (MediaPackageException e) {
696 throw new PublicationException("Error filtering media package", e);
697 }
698 } else if (result.size() == 0) {
699 logger.info("Skipping update of media package {} since it is not currently published to {}",
700 mediaPackage, repository);
701 return null;
702 } else {
703 final String msg = format(
704 "More than one media package with id %s found",
705 mediaPackage.getIdentifier().toString()
706 );
707 logger.warn(msg);
708 throw new PublicationException(msg);
709 }
710
711 Set<String> elementIdsToDistribute = new HashSet<>();
712 for (MediaPackageElement mpe : filteredMp.getElements()) {
713
714 if (MediaPackageElement.Type.Publication == mpe.getElementType()) {
715 continue;
716 }
717 elementIdsToDistribute.add(mpe.getIdentifier());
718 }
719 if (elementIdsToDistribute.isEmpty()) {
720 logger.debug("The media package {} does not contain any elements to update. "
721 + "Skip OAI-PMH metadata update operation for repository {}",
722 mediaPackage.getIdentifier().toString(), repository);
723 return null;
724 }
725 logger.debug("distribute elements {}", StringUtils.join(elementIdsToDistribute, ", "));
726 final List<MediaPackageElement> distributedElements = new ArrayList<>();
727 try {
728 Job distJob = downloadDistributionService
729 .distribute(getPublicationChannelName(repository), filteredMp, elementIdsToDistribute, checkAvailability);
730 if (job == null) {
731 throw new PublicationException("The distribution service can not handle this type of media package elements.");
732 }
733 if (!waitForJobs(job, serviceRegistry, distJob).isSuccess()) {
734 throw new PublicationException(format(
735 "Unable to distribute updated elements from media package %s to the download distribution service",
736 mediaPackage.getIdentifier().toString()));
737 }
738 if (distJob.getPayload() != null) {
739 for (MediaPackageElement mpe : MediaPackageElementParser.getArrayFromXml(distJob.getPayload())) {
740 distributedElements.add(mpe);
741 }
742 }
743 } catch (DistributionException | MediaPackageException e) {
744 throw new PublicationException(format(
745 "Unable to distribute updated elements from media package %s to the download distribution service",
746 mediaPackage.getIdentifier().toString()), e);
747 }
748
749
750 for (MediaPackageElement e : filteredMp.getElements()) {
751 if (MediaPackageElement.Type.Publication.equals(e.getElementType())) {
752 continue;
753 }
754 filteredMp.remove(e);
755 }
756 for (MediaPackageElement e : distributedElements) {
757 filteredMp.add(e);
758 }
759 MediaPackage publishedMp = merge(filteredMp, removeMatchingNonExistantElements(filteredMp,
760 (MediaPackage) result.getItems().get(0).getMediaPackage().clone(), parsedFlavors, tags));
761
762 try {
763 logger.debug("Updating metadata of media package {} in {}",
764 publishedMp.getIdentifier().toString(), repository);
765 oaiPmhDatabase.store(publishedMp, repository);
766 } catch (OaiPmhDatabaseException e) {
767 throw new PublicationException(format("Media package %s could not be updated",
768 publishedMp.getIdentifier().toString()));
769 }
770
771
772
773 Map<URI, MediaPackageElement> elementUriMap = new Hashtable<>();
774 for (SearchResultItem oaiPmhSearchResultItem : result.getItems()) {
775 for (MediaPackageElement mpe : oaiPmhSearchResultItem.getMediaPackage().getElements()) {
776 if (MediaPackageElement.Type.Publication == mpe.getElementType() || null == mpe.getURI()) {
777 continue;
778 }
779 elementUriMap.put(mpe.getURI(), mpe);
780 }
781 }
782 for (MediaPackageElement publishedMpe : publishedMp.getElements()) {
783 if (MediaPackageElement.Type.Publication == publishedMpe.getElementType()) {
784 continue;
785 }
786 if (elementUriMap.containsKey(publishedMpe.getURI())) {
787 elementUriMap.remove(publishedMpe.getURI());
788 }
789 }
790 Set<String> orphanedElementIds = new HashSet<>();
791 for (MediaPackageElement orphanedMpe : elementUriMap.values()) {
792 orphanedElementIds.add(orphanedMpe.getIdentifier());
793 }
794 if (!orphanedElementIds.isEmpty()) {
795 for (SearchResultItem oaiPmhSearchResultItem : result.getItems()) {
796 try {
797 Job retractJob = downloadDistributionService.retract(getPublicationChannelName(repository),
798 oaiPmhSearchResultItem.getMediaPackage(), orphanedElementIds);
799 if (retractJob != null) {
800 if (!waitForJobs(job, serviceRegistry, retractJob).isSuccess()) {
801 logger.warn(
802 "The download distribution retract job for the orphaned elements from media package {} "
803 + "does not end successfully",
804 oaiPmhSearchResultItem.getMediaPackage().getIdentifier().toString());
805 }
806 }
807 } catch (DistributionException e) {
808 logger.warn(
809 "Unable to retract orphaned elements from download distribution service for the "
810 + "media package {} channel {}",
811 oaiPmhSearchResultItem.getMediaPackage().getIdentifier().toString(),
812 getPublicationChannelName(repository),
813 e);
814 }
815 }
816 }
817
818
819 String publicationChannel = getPublicationChannelName(repository);
820 for (Publication p : mediaPackage.getPublications()) {
821 if (StringUtils.equals(publicationChannel, p.getChannel())) {
822 return p;
823 }
824 }
825 return null;
826 }
827
828 protected void checkInputArguments(MediaPackage mediaPackage, String repository) {
829 if (mediaPackage == null) {
830 throw new IllegalArgumentException("Media package must be specified");
831 }
832 if (StringUtils.isEmpty(repository)) {
833 throw new IllegalArgumentException("Repository must be specified");
834 }
835 if (!oaiPmhServerInfo.hasRepo(repository)) {
836 throw new IllegalArgumentException("OAI-PMH repository '" + repository + "' does not exist");
837 }
838 }
839
840 protected String getPublicationChannelName(String repository) {
841 return PUBLICATION_CHANNEL_PREFIX.concat(repository);
842 }
843
844
845 protected Publication createPublicationElement(String mpId, String repository) throws PublicationException {
846 for (String hostUrl : OaiPmhServerInfoUtil.oaiPmhServerUrlOfCurrentOrganization(securityService)) {
847 final URI engageUri = URIUtils.resolve(
848 URI.create(UrlSupport.concat(hostUrl, oaiPmhServerInfo.getMountPoint(), repository)),
849 "?verb=ListMetadataFormats&identifier=" + mpId);
850 return PublicationImpl.publication(UUID.randomUUID().toString(), getPublicationChannelName(repository), engageUri,
851 MimeTypes.parseMimeType(MimeTypes.XML.toString()));
852 }
853
854 final String msg = format("No host url for oai-pmh server configured for organization %s " + "("
855 + OaiPmhServerInfoUtil.ORG_CFG_OAIPMH_SERVER_HOSTURL + ")", securityService.getOrganization().getId());
856 throw new PublicationException(msg);
857 }
858
859 private static MediaPackage updateMediaPackageFields(MediaPackage oaiPmhMp, MediaPackage mediaPackage) {
860 oaiPmhMp.setTitle(mediaPackage.getTitle());
861 oaiPmhMp.setDate(mediaPackage.getDate());
862 oaiPmhMp.setLanguage(mediaPackage.getLanguage());
863 oaiPmhMp.setLicense(mediaPackage.getLicense());
864 oaiPmhMp.setSeries(mediaPackage.getSeries());
865 oaiPmhMp.setSeriesTitle(mediaPackage.getSeriesTitle());
866 for (String contributor : oaiPmhMp.getContributors()) {
867 oaiPmhMp.removeContributor(contributor);
868 }
869 for (String contributor : mediaPackage.getContributors()) {
870 oaiPmhMp.addContributor(contributor);
871 }
872 for (String creator : oaiPmhMp.getCreators()) {
873 oaiPmhMp.removeCreator(creator);
874 }
875 for (String creator : mediaPackage.getCreators()) {
876 oaiPmhMp.addCreator(creator);
877 }
878 for (String subject : oaiPmhMp.getSubjects()) {
879 oaiPmhMp.removeSubject(subject);
880 }
881 for (String subject : mediaPackage.getSubjects()) {
882 oaiPmhMp.addSubject(subject);
883 }
884 return oaiPmhMp;
885 }
886
887
888
889
890
891
892
893
894
895
896
897
898
899 private MediaPackage filterMediaPackage(
900 MediaPackage mediaPackage,
901 Set<MediaPackageElementFlavor> flavors,
902 Set<String> tags
903 ) throws MediaPackageException {
904 if (flavors.isEmpty() && tags.isEmpty()) {
905 throw new IllegalArgumentException("Flavors or tags parameter must be set");
906 }
907
908 MediaPackage filteredMediaPackage = (MediaPackage) mediaPackage.clone();
909
910
911 List<MediaPackageElement> keep = new ArrayList<>();
912
913 SimpleElementSelector selector = new SimpleElementSelector();
914
915 for (MediaPackageElementFlavor flavor : flavors) {
916 selector.addFlavor(flavor);
917 }
918 for (String tag : tags) {
919 selector.addTag(tag);
920 }
921 keep.addAll(selector.select(mediaPackage, true));
922
923
924 for (Publication p : filteredMediaPackage.getPublications()) {
925 keep.add(p);
926 }
927
928
929 for (MediaPackageElement element : filteredMediaPackage.getElements()) {
930
931 if (!keep.contains(element)) {
932 logger.debug("Removing {} '{}' from media package '{}'", element.getElementType().toString().toLowerCase(),
933 element.getIdentifier(), filteredMediaPackage.getIdentifier().toString());
934 filteredMediaPackage.remove(element);
935 continue;
936 }
937
938
939 MediaPackageReference reference = element.getReference();
940 if (reference != null) {
941 MediaPackageElement referencedElement = mediaPackage.getElementByReference(reference);
942
943
944 if (referencedElement != null && !keep.contains(referencedElement)) {
945
946
947 MediaPackageElement parent = null;
948 while ((parent = mediaPackage.getElementByReference(reference)) != null) {
949 if (parent.getFlavor() != null && element.getFlavor() == null) {
950 element.setFlavor(parent.getFlavor());
951 }
952 if (parent.getReference() == null) {
953 break;
954 }
955 reference = parent.getReference();
956 }
957
958
959 if (reference != null && reference.getType().equals(MediaPackageReference.TYPE_MEDIAPACKAGE)) {
960 element.setReference(reference);
961 } else {
962 element.clearReference();
963 }
964 }
965 }
966 }
967
968 return filteredMediaPackage;
969 }
970
971
972
973
974
975
976
977
978
979
980
981
982 public static MediaPackage removeMatchingNonExistantElements(MediaPackage updatedMp, MediaPackage publishedMp,
983 Set<MediaPackageElementFlavor> flavors, Set<String> tags) {
984 SimpleElementSelector selector = new SimpleElementSelector();
985
986 for (MediaPackageElementFlavor flavor : flavors) {
987 selector.addFlavor(flavor);
988 }
989 for (String tag : tags) {
990 selector.addTag(tag);
991 }
992 for (MediaPackageElement publishedMpe : selector.select(publishedMp, true)) {
993 boolean foundInUpdatedMp = false;
994 for (MediaPackageElement updatedMpe : updatedMp.getElementsByFlavor(publishedMpe.getFlavor())) {
995 if (!updatedMpe.containsTag(tags)) {
996
997 }
998 foundInUpdatedMp = true;
999 break;
1000 }
1001
1002 if (!foundInUpdatedMp) {
1003 publishedMp.remove(publishedMpe);
1004 }
1005 }
1006 return publishedMp;
1007 }
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022 public static MediaPackage merge(MediaPackage updatedMp, MediaPackage publishedMp) {
1023 if (publishedMp == null) {
1024 return updatedMp;
1025 }
1026
1027 final MediaPackage mergedMp = MediaPackageSupport.copy(publishedMp);
1028
1029
1030 for (final MediaPackageElement updatedElement : updatedMp.elements()) {
1031 MediaPackageElementFlavor flavor = updatedElement.getFlavor();
1032 if (flavor != null) {
1033 for (final MediaPackageElement outdated : mergedMp.getElementsByFlavor(flavor)) {
1034 mergedMp.remove(outdated);
1035 }
1036 logger.debug("Update {} {} of type {}", updatedElement.getElementType().toString().toLowerCase(),
1037 updatedElement.getIdentifier(), updatedElement.getElementType());
1038 mergedMp.add(updatedElement);
1039 }
1040 }
1041
1042
1043 for (final Publication p : mergedMp.getPublications()) {
1044 mergedMp.remove(p);
1045 }
1046
1047
1048 for (final Publication updatedPublication : updatedMp.getPublications()) {
1049 mergedMp.add(updatedPublication);
1050 }
1051
1052
1053 updateMediaPackageFields(mergedMp, updatedMp);
1054 return mergedMp;
1055 }
1056
1057 @Override
1058 protected ServiceRegistry getServiceRegistry() {
1059 return serviceRegistry;
1060 }
1061
1062
1063
1064
1065
1066
1067 @Override
1068 protected SecurityService getSecurityService() {
1069 return securityService;
1070 }
1071
1072
1073
1074
1075
1076
1077 @Override
1078 protected UserDirectoryService getUserDirectoryService() {
1079 return userDirectoryService;
1080 }
1081
1082
1083
1084
1085
1086
1087 @Override
1088 protected OrganizationDirectoryService getOrganizationDirectoryService() {
1089 return organizationDirectoryService;
1090 }
1091
1092
1093 @Reference
1094 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
1095 this.serviceRegistry = serviceRegistry;
1096 }
1097
1098
1099 @Reference
1100 public void setSecurityService(SecurityService securityService) {
1101 this.securityService = securityService;
1102 }
1103
1104
1105 @Reference
1106 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1107 this.userDirectoryService = userDirectoryService;
1108 }
1109
1110
1111 @Reference
1112 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
1113 this.organizationDirectoryService = organizationDirectoryService;
1114 }
1115
1116
1117 @Reference(target = "(distribution.channel=download)")
1118 public void setDownloadDistributionService(DownloadDistributionService downloadDistributionService) {
1119 this.downloadDistributionService = downloadDistributionService;
1120 }
1121
1122
1123 @Reference
1124 public void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
1125 this.streamingDistributionService = streamingDistributionService;
1126 }
1127
1128
1129 @Reference
1130 public void setOaiPmhServerInfo(OaiPmhServerInfo oaiPmhServerInfo) {
1131 this.oaiPmhServerInfo = oaiPmhServerInfo;
1132 }
1133
1134
1135 @Reference
1136 public void setOaiPmhDatabase(OaiPmhDatabase oaiPmhDatabase) {
1137 this.oaiPmhDatabase = oaiPmhDatabase;
1138 }
1139 }