1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.publication.oaipmh;
22
23 import static java.lang.String.format;
24 import static org.opencastproject.util.JobUtil.waitForJobs;
25
26 import org.opencastproject.distribution.api.DistributionException;
27 import org.opencastproject.distribution.api.DownloadDistributionService;
28 import org.opencastproject.distribution.api.StreamingDistributionService;
29 import org.opencastproject.job.api.AbstractJobProducer;
30 import org.opencastproject.job.api.Job;
31 import org.opencastproject.mediapackage.MediaPackage;
32 import org.opencastproject.mediapackage.MediaPackageElement;
33 import org.opencastproject.mediapackage.MediaPackageElementFlavor;
34 import org.opencastproject.mediapackage.MediaPackageElementParser;
35 import org.opencastproject.mediapackage.MediaPackageException;
36 import org.opencastproject.mediapackage.MediaPackageParser;
37 import org.opencastproject.mediapackage.MediaPackageReference;
38 import org.opencastproject.mediapackage.MediaPackageRuntimeException;
39 import org.opencastproject.mediapackage.MediaPackageSupport;
40 import org.opencastproject.mediapackage.Publication;
41 import org.opencastproject.mediapackage.PublicationImpl;
42 import org.opencastproject.mediapackage.selector.SimpleElementSelector;
43 import org.opencastproject.oaipmh.persistence.OaiPmhDatabase;
44 import org.opencastproject.oaipmh.persistence.OaiPmhDatabaseException;
45 import org.opencastproject.oaipmh.persistence.QueryBuilder;
46 import org.opencastproject.oaipmh.persistence.SearchResult;
47 import org.opencastproject.oaipmh.persistence.SearchResultItem;
48 import org.opencastproject.oaipmh.server.OaiPmhServerInfo;
49 import org.opencastproject.oaipmh.server.OaiPmhServerInfoUtil;
50 import org.opencastproject.publication.api.OaiPmhPublicationService;
51 import org.opencastproject.publication.api.PublicationException;
52 import org.opencastproject.security.api.OrganizationDirectoryService;
53 import org.opencastproject.security.api.SecurityService;
54 import org.opencastproject.security.api.UserDirectoryService;
55 import org.opencastproject.serviceregistry.api.ServiceRegistry;
56 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
57 import org.opencastproject.util.MimeTypes;
58 import org.opencastproject.util.NotFoundException;
59 import org.opencastproject.util.UrlSupport;
60 import org.opencastproject.util.data.Collections;
61
62 import com.entwinemedia.fn.data.Opt;
63
64 import org.apache.commons.lang3.BooleanUtils;
65 import org.apache.commons.lang3.StringUtils;
66 import org.apache.http.client.utils.URIUtils;
67 import org.osgi.service.component.annotations.Component;
68 import org.osgi.service.component.annotations.Reference;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 import java.net.URI;
73 import java.util.ArrayList;
74 import java.util.Arrays;
75 import java.util.HashSet;
76 import java.util.Hashtable;
77 import java.util.List;
78 import java.util.Map;
79 import java.util.Optional;
80 import java.util.Set;
81 import java.util.UUID;
82 import java.util.stream.Collectors;
83 import java.util.stream.Stream;
84
85
86
87
88 @Component(
89 immediate = true,
90 service = OaiPmhPublicationService.class,
91 property = {
92 "service.description=Publication Service (OAI-PMH)"
93 }
94 )
95 public class OaiPmhPublicationServiceImpl extends AbstractJobProducer implements OaiPmhPublicationService {
96
97
98 private static final Logger logger = LoggerFactory.getLogger(OaiPmhPublicationServiceImpl.class);
99
100 public enum Operation {
101 Publish, Retract, UpdateMetadata, Replace
102 }
103
104 private DownloadDistributionService downloadDistributionService;
105 private OaiPmhServerInfo oaiPmhServerInfo;
106 private OaiPmhDatabase oaiPmhDatabase;
107 private OrganizationDirectoryService organizationDirectoryService;
108 private SecurityService securityService;
109 private ServiceRegistry serviceRegistry;
110 private StreamingDistributionService streamingDistributionService;
111 private UserDirectoryService userDirectoryService;
112
113 public OaiPmhPublicationServiceImpl() {
114 super(JOB_TYPE);
115 }
116
117 @Override
118 protected String process(Job job) throws Exception {
119 if (!StringUtils.equalsIgnoreCase(JOB_TYPE, job.getJobType())) {
120 throw new IllegalArgumentException("Can not handle job type " + job.getJobType());
121 }
122
123 Publication publication = null;
124 MediaPackage mediaPackage = MediaPackageParser.getFromXml(job.getArguments().get(0));
125 String repository = job.getArguments().get(1);
126 boolean checkAvailability = false;
127 switch (Operation.valueOf(job.getOperation())) {
128 case Publish:
129 String[] downloadElementIds = StringUtils.split(job.getArguments().get(2), SEPARATOR);
130 String[] streamingElementIds = StringUtils.split(job.getArguments().get(3), SEPARATOR);
131 checkAvailability = BooleanUtils.toBoolean(job.getArguments().get(4));
132 publication = publish(job, mediaPackage, repository,
133 Collections.set(downloadElementIds), Collections.set(streamingElementIds), checkAvailability);
134 break;
135 case Replace:
136 final Set<? extends MediaPackageElement> downloadElements =
137 Collections.toSet(MediaPackageElementParser.getArrayFromXml(job.getArguments().get(2)));
138 final Set<? extends MediaPackageElement> streamingElements =
139 Collections.toSet(MediaPackageElementParser.getArrayFromXml(job.getArguments().get(3)));
140 final Set<MediaPackageElementFlavor> retractDownloadFlavors = Arrays.stream(
141 StringUtils.split(job.getArguments().get(4), SEPARATOR))
142 .filter(s -> !s.isEmpty())
143 .map(MediaPackageElementFlavor::parseFlavor)
144 .collect(Collectors.toSet());
145 final Set<MediaPackageElementFlavor> retractStreamingFlavors = Arrays.stream(
146 StringUtils.split(job.getArguments().get(5), SEPARATOR))
147 .filter(s -> !s.isEmpty())
148 .map(MediaPackageElementFlavor::parseFlavor)
149 .collect(Collectors.toSet());
150 final Set<? extends MediaPackageElement> publications =
151 Collections.toSet(MediaPackageElementParser.getArrayFromXml(job.getArguments().get(6)));
152 checkAvailability = BooleanUtils.toBoolean(job.getArguments().get(7));
153 publication = replace(job, mediaPackage, repository, downloadElements, streamingElements,
154 retractDownloadFlavors, retractStreamingFlavors, publications, checkAvailability);
155 break;
156 case Retract:
157 publication = retract(job, mediaPackage, repository);
158 break;
159 case UpdateMetadata:
160 checkAvailability = BooleanUtils.toBoolean(job.getArguments().get(4));
161 String[] flavors = StringUtils.split(job.getArguments().get(2), SEPARATOR);
162 String[] tags = StringUtils.split(job.getArguments().get(3), SEPARATOR);
163 publication = updateMetadata(job, mediaPackage, repository,
164 Collections.set(flavors), Collections.set(tags), checkAvailability);
165 break;
166 default:
167 throw new IllegalArgumentException("Can not handle this type of operation: " + job.getOperation());
168 }
169 return publication != null ? MediaPackageElementParser.getAsXml(publication) : null;
170 }
171
172 @Override
173 public Job replace(
174 MediaPackage mediaPackage,
175 String repository,
176 Set<? extends MediaPackageElement> downloadElements,
177 Set<? extends MediaPackageElement> streamingElements,
178 Set<MediaPackageElementFlavor> retractDownloadFlavors,
179 Set<MediaPackageElementFlavor> retractStreamingFlavors,
180 Set<? extends Publication> publications,
181 boolean checkAvailability
182 ) throws PublicationException {
183 checkInputArguments(mediaPackage, repository);
184 try {
185 return serviceRegistry.createJob(JOB_TYPE, Operation.Replace.name(),
186 Arrays.asList(MediaPackageParser.getAsXml(mediaPackage),
187 repository,
188 MediaPackageElementParser.getArrayAsXml(Collections.toList(downloadElements)),
189 MediaPackageElementParser.getArrayAsXml(Collections.toList(streamingElements)),
190 StringUtils.join(retractDownloadFlavors, SEPARATOR),
191 StringUtils.join(retractStreamingFlavors, SEPARATOR),
192 MediaPackageElementParser.getArrayAsXml(Collections.toList(publications)),
193 Boolean.toString(checkAvailability)));
194 } catch (ServiceRegistryException e) {
195 throw new PublicationException("Unable to create job", e);
196 } catch (MediaPackageException e) {
197 throw new PublicationException("Unable to serialize media package elements", e);
198 }
199 }
200
201 @Override
202 public Publication replaceSync(
203 MediaPackage mediaPackage, String repository, Set<? extends MediaPackageElement> downloadElements,
204 Set<? extends MediaPackageElement> streamingElements, Set<MediaPackageElementFlavor> retractDownloadFlavors,
205 Set<MediaPackageElementFlavor> retractStreamingFlavors, Set<? extends Publication> publications,
206 boolean checkAvailability) throws PublicationException, MediaPackageException {
207 return replace(null, mediaPackage, repository, downloadElements, streamingElements, retractDownloadFlavors,
208 retractStreamingFlavors, publications, checkAvailability);
209 }
210
211 @Override
212 public Job publish(MediaPackage mediaPackage, String repository, Set<String> downloadElementIds,
213 Set<String> streamingElementIds, boolean checkAvailability)
214 throws PublicationException, MediaPackageException {
215 checkInputArguments(mediaPackage, repository);
216
217 try {
218 return serviceRegistry.createJob(JOB_TYPE, Operation.Publish.toString(),
219 Arrays.asList(MediaPackageParser.getAsXml(mediaPackage),
220 repository,
221 StringUtils.join(downloadElementIds, SEPARATOR),
222 StringUtils.join(streamingElementIds, SEPARATOR),
223 Boolean.toString(checkAvailability)));
224 } catch (ServiceRegistryException e) {
225 throw new PublicationException("Unable to create job", e);
226 }
227 }
228
229 @Override
230 public Job retract(MediaPackage mediaPackage, String repository) throws PublicationException, NotFoundException {
231 checkInputArguments(mediaPackage, repository);
232
233 try {
234 return serviceRegistry.createJob(JOB_TYPE, Operation.Retract.toString(),
235 Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), repository));
236 } catch (ServiceRegistryException e) {
237 throw new PublicationException("Unable to create job", e);
238 }
239 }
240
241 @Override
242 public Job updateMetadata(MediaPackage mediaPackage, String repository, Set<String> flavors, Set<String> tags,
243 boolean checkAvailability) throws PublicationException, MediaPackageException {
244 checkInputArguments(mediaPackage, repository);
245 if ((flavors == null || flavors.isEmpty()) && (tags == null || tags.isEmpty())) {
246 throw new IllegalArgumentException("Flavors or tags must be set");
247 }
248
249 try {
250 return serviceRegistry.createJob(JOB_TYPE, Operation.UpdateMetadata.toString(),
251 Arrays.asList(MediaPackageParser.getAsXml(mediaPackage),
252 repository,
253 StringUtils.join(flavors, SEPARATOR),
254 StringUtils.join(tags, SEPARATOR),
255 Boolean.toString(checkAvailability)));
256 } catch (ServiceRegistryException e) {
257 throw new PublicationException("Unable to create job", e);
258 }
259 }
260
261 protected Publication publish(Job job, MediaPackage mediaPackage, String repository, Set<String> downloadElementIds,
262 Set<String> streamingElementIds, boolean checkAvailability)
263 throws PublicationException, MediaPackageException {
264 String mpId = mediaPackage.getIdentifier().toString();
265 SearchResult searchResult = oaiPmhDatabase.search(QueryBuilder.queryRepo(repository).mediaPackageId(mpId)
266 .isDeleted(false).build());
267
268 if (searchResult.size() > 0) {
269 try {
270 Publication p = retract(job, mediaPackage, repository);
271 if (p != null && mediaPackage.contains(p)) {
272 mediaPackage.remove(p);
273 }
274 } catch (NotFoundException e) {
275 logger.debug("No OAI-PMH publication found for media package {}.", mpId, e);
276
277 }
278 }
279 List<Job> distributionJobs = new ArrayList<>(2);
280 if (downloadElementIds != null && !downloadElementIds.isEmpty()) {
281
282 MediaPackage mpDownloadDist = (MediaPackage) mediaPackage.clone();
283 for (MediaPackageElement mpe : mpDownloadDist.getElements()) {
284 if (downloadElementIds.contains(mpe.getIdentifier())) {
285 continue;
286 }
287 mpDownloadDist.remove(mpe);
288 }
289
290 if (mpDownloadDist.getElements().length > 0) {
291 try {
292 Job downloadDistributionJob = downloadDistributionService
293 .distribute(getPublicationChannelName(repository), mpDownloadDist, downloadElementIds,
294 checkAvailability);
295 if (downloadDistributionJob != null) {
296 distributionJobs.add(downloadDistributionJob);
297 }
298 } catch (DistributionException e) {
299 throw new PublicationException(
300 format("Unable to distribute media package %s to download distribution.", mpId),
301 e);
302 }
303 }
304 }
305 if (streamingElementIds != null && !streamingElementIds.isEmpty()) {
306
307 MediaPackage mpStreamingDist = (MediaPackage) mediaPackage.clone();
308 for (MediaPackageElement mpe : mpStreamingDist.getElements()) {
309 if (streamingElementIds.contains(mpe.getIdentifier())) {
310 continue;
311 }
312 mpStreamingDist.remove(mpe);
313 }
314
315 if (mpStreamingDist.getElements().length > 0) {
316 try {
317 Job streamingDistributionJob = streamingDistributionService
318 .distribute(getPublicationChannelName(repository), mpStreamingDist, streamingElementIds);
319 if (streamingDistributionJob != null) {
320 distributionJobs.add(streamingDistributionJob);
321 }
322 } catch (DistributionException e) {
323 throw new PublicationException(
324 format("Unable to distribute media package %s to streaming distribution.", mpId),
325 e);
326 }
327 }
328 }
329 if (distributionJobs.isEmpty()) {
330 throw new IllegalStateException(format(
331 "The media package %s does not contain any elements for publishing to OAI-PMH", mpId));
332 }
333
334 if (!waitForJobs(job, serviceRegistry, distributionJobs).isSuccess()) {
335 throw new PublicationException(format(
336 "Unable to distribute elements of media package %s to distribution channels.", mpId));
337 }
338
339 List<MediaPackageElement> distributedElements = new ArrayList<>();
340 for (Job distributionJob : distributionJobs) {
341 String distributedElementsXml = distributionJob.getPayload();
342 if (StringUtils.isNotBlank(distributedElementsXml)) {
343 for (MediaPackageElement distributedElement
344 : MediaPackageElementParser.getArrayFromXml(distributedElementsXml)) {
345 distributedElements.add(distributedElement);
346 }
347 }
348 }
349 MediaPackage oaiPmhDistMp = (MediaPackage) mediaPackage.clone();
350
351 for (MediaPackageElement mpe : oaiPmhDistMp.getElements()) {
352
353 if (MediaPackageElement.Type.Publication == mpe.getElementType()) {
354 continue;
355 }
356 oaiPmhDistMp.remove(mpe);
357 }
358
359 for (MediaPackageElement mpe : distributedElements) {
360 oaiPmhDistMp.add(mpe);
361 }
362
363
364 try {
365 oaiPmhDatabase.store(oaiPmhDistMp, repository);
366 } catch (OaiPmhDatabaseException e) {
367
368 throw new PublicationException(
369 format("Unable to distribute media package %s to OAI-PMH repository %s", mpId, repository),
370 e);
371 }
372 return createPublicationElement(mpId, repository);
373 }
374
375 private Publication replace(
376 Job job,
377 MediaPackage mediaPackage,
378 String repository,
379 Set<? extends MediaPackageElement> downloadElements,
380 Set<? extends MediaPackageElement> streamingElements,
381 Set<MediaPackageElementFlavor> retractDownloadFlavors,
382 Set<MediaPackageElementFlavor> retractStreamingFlavors,
383 Set<? extends MediaPackageElement> publications,
384 boolean checkAvailable
385 ) throws MediaPackageException, PublicationException {
386 final String mpId = mediaPackage.getIdentifier().toString();
387 final String channel = getPublicationChannelName(repository);
388
389 try {
390 final SearchResult search = oaiPmhDatabase.search(QueryBuilder.queryRepo(repository).mediaPackageId(mpId)
391 .isDeleted(false).build());
392 if (search.size() > 1) {
393 throw new PublicationException("Found multiple OAI-PMH records for id " + mpId);
394 }
395 final Optional<MediaPackage> existingMp = search.getItems().stream().findFirst().map(
396 SearchResultItem::getMediaPackage);
397
398
399 final Set<String> addDownloadElementIds = downloadElements.stream()
400 .map(MediaPackageElement::getIdentifier)
401 .collect(Collectors.toSet());
402 final Set<String> addStreamingElementIds = streamingElements.stream()
403 .map(MediaPackageElement::getIdentifier)
404 .collect(Collectors.toSet());
405
406
407 final Set<MediaPackageElement> removeDownloadElements = existingMp.map(mp ->
408 Arrays.stream(mp.getElements())
409 .filter(e -> retractDownloadFlavors.stream().anyMatch(f -> f.matches(e.getFlavor())))
410 .collect(Collectors.toSet())
411 ).orElse(java.util.Collections.emptySet());
412 final Set<MediaPackageElement> removeStreamingElements = existingMp.map(mp ->
413 Arrays.stream(mp.getElements())
414 .filter(e -> retractStreamingFlavors.stream().anyMatch(f -> f.matches(e.getFlavor())))
415 .collect(Collectors.toSet())
416 ).orElse(java.util.Collections.emptySet());
417
418
419 final Set<String> removeDownloadElementIds = Stream
420 .concat(removeDownloadElements.stream(), downloadElements.stream())
421 .map(MediaPackageElement::getIdentifier)
422 .collect(Collectors.toSet());
423 final Set<String> removeStreamingElementIds = Stream
424 .concat(removeStreamingElements.stream(), streamingElements.stream())
425 .map(MediaPackageElement::getIdentifier)
426 .collect(Collectors.toSet());
427
428 if (removeDownloadElementIds.isEmpty() && removeStreamingElementIds.isEmpty()
429 && addDownloadElementIds.isEmpty() && addStreamingElementIds.isEmpty()) {
430
431 return Arrays.stream(mediaPackage.getPublications())
432 .filter(p -> channel.equals(p.getChannel()))
433 .findFirst()
434 .orElse(null);
435 }
436
437 final MediaPackage temporaryMediaPackage = (MediaPackage) mediaPackage.clone();
438 downloadElements.forEach(temporaryMediaPackage::add);
439 streamingElements.forEach(temporaryMediaPackage::add);
440 removeDownloadElements.forEach(temporaryMediaPackage::add);
441 removeStreamingElements.forEach(temporaryMediaPackage::add);
442
443 final List<MediaPackageElement> retractedElements = new ArrayList<>();
444 final List<MediaPackageElement> distributedElements = new ArrayList<>();
445 if (job != null) {
446 retractedElements
447 .addAll(retract(job, channel, temporaryMediaPackage, removeDownloadElementIds, removeStreamingElementIds));
448 distributedElements
449 .addAll(distribute(job, channel, temporaryMediaPackage, addDownloadElementIds, addStreamingElementIds,
450 checkAvailable));
451 } else {
452 retractedElements
453 .addAll(retractSync(channel, temporaryMediaPackage, removeDownloadElementIds, removeStreamingElementIds));
454 distributedElements
455 .addAll(distributeSync(channel, temporaryMediaPackage, addDownloadElementIds, addStreamingElementIds,
456 checkAvailable));
457 }
458
459 final MediaPackage oaiPmhDistMp = (MediaPackage) existingMp.orElse(mediaPackage).clone();
460
461
462 Arrays.stream(oaiPmhDistMp.getPublications())
463 .filter(p -> channel.equals(p.getChannel()))
464 .forEach(oaiPmhDistMp::remove);
465
466
467 retractedElements.stream()
468 .map(MediaPackageElement::getIdentifier)
469 .forEach(oaiPmhDistMp::removeElementById);
470
471 distributedElements.forEach(oaiPmhDistMp::add);
472
473
474 publications.stream()
475 .map(p -> ((Publication) p).getChannel())
476 .forEach(c -> Arrays.stream(oaiPmhDistMp.getPublications())
477 .filter(p -> c.equals(p.getChannel()))
478 .forEach(oaiPmhDistMp::remove));
479
480
481 publications.forEach(oaiPmhDistMp::add);
482
483
484 oaiPmhDatabase.store(oaiPmhDistMp, repository);
485
486 return Arrays.stream(mediaPackage.getPublications())
487 .filter(p -> channel.equals(p.getChannel()))
488 .findFirst()
489 .orElse(createPublicationElement(mpId, repository));
490 } catch (OaiPmhDatabaseException e) {
491 throw new PublicationException(format("Unable to update media package %s in OAI-PMH repository %s", mpId,
492 repository), e);
493 } catch (DistributionException e) {
494 throw new PublicationException(format("Unable to update OAI-PMH distributions of media package %s.", mpId), e);
495 } catch (MediaPackageRuntimeException e) {
496 throw e.getWrappedException();
497 }
498 }
499
500 private List<MediaPackageElement> retract(
501 Job job, String channel, MediaPackage mp, Set<String> removeDownloadElementIds,
502 Set<String> removeStreamingElementIds) throws PublicationException, DistributionException {
503 final List<Job> retractJobs = new ArrayList<>(2);
504 if (!removeDownloadElementIds.isEmpty()) {
505 retractJobs.add(downloadDistributionService.retract(channel, mp, removeDownloadElementIds));
506 }
507 if (!removeStreamingElementIds.isEmpty()) {
508 retractJobs.add(streamingDistributionService.retract(channel, mp, removeStreamingElementIds));
509 }
510
511
512 if (!waitForJobs(job, serviceRegistry, retractJobs).isSuccess()) {
513 throw new PublicationException(format("Unable to retract OAI-PMH distributions of media package %s",
514 mp.getIdentifier().toString()));
515 }
516
517 return retractJobs.stream()
518 .filter(j -> StringUtils.isNotBlank(j.getPayload()))
519 .map(Job::getPayload)
520 .flatMap(p -> MediaPackageElementParser.getArrayFromXmlUnchecked(p).stream())
521 .collect(Collectors.toList());
522 }
523
524 private List<MediaPackageElement> retractSync(String channel, MediaPackage mp, Set<String> removeDownloadElementIds,
525 Set<String> removeStreamingElementIds) throws DistributionException {
526 final List<MediaPackageElement> retracted = new ArrayList<>();
527 if (!removeDownloadElementIds.isEmpty()) {
528 retracted.addAll(downloadDistributionService.retractSync(channel, mp, removeDownloadElementIds));
529 }
530 if (!removeStreamingElementIds.isEmpty()) {
531 retracted.addAll(streamingDistributionService.retractSync(channel, mp, removeStreamingElementIds));
532 }
533 return retracted;
534 }
535
536 private List<MediaPackageElement> distribute(
537 Job job,
538 String channel,
539 MediaPackage mp,
540 Set<String> addDownloadElementIds,
541 Set<String> addStreamingElementIds,
542 boolean checkAvailable
543 ) throws PublicationException, MediaPackageException, DistributionException {
544 final List<Job> distributeJobs = new ArrayList<>(2);
545 if (!addDownloadElementIds.isEmpty()) {
546 distributeJobs.add(downloadDistributionService.distribute(channel, mp, addDownloadElementIds,
547 checkAvailable));
548 }
549 if (!addStreamingElementIds.isEmpty()) {
550 distributeJobs.add(streamingDistributionService.distribute(channel, mp, addStreamingElementIds));
551 }
552
553
554 if (!waitForJobs(job, serviceRegistry, distributeJobs).isSuccess()) {
555 throw new PublicationException(format("Unable to distribute OAI-PMH distributions of media package %s",
556 mp.getIdentifier().toString()));
557 }
558
559 return distributeJobs.stream()
560 .filter(j -> StringUtils.isNotBlank(j.getPayload()))
561 .map(Job::getPayload)
562 .flatMap(p -> MediaPackageElementParser.getArrayFromXmlUnchecked(p).stream())
563 .collect(Collectors.toList());
564 }
565
566 private List<MediaPackageElement> distributeSync(
567 String channel, MediaPackage mp, Set<String> addDownloadElementIds, Set<String> addStreamingElementIds,
568 boolean checkAvailable) throws DistributionException {
569 final List<MediaPackageElement> distributed = new ArrayList<>();
570 if (!addDownloadElementIds.isEmpty()) {
571 distributed.addAll(downloadDistributionService.distributeSync(channel, mp, addDownloadElementIds,
572 checkAvailable));
573 }
574 if (!addStreamingElementIds.isEmpty()) {
575 distributed.addAll(streamingDistributionService.distributeSync(channel, mp, addStreamingElementIds));
576 }
577 return distributed;
578 }
579
580
581 protected Publication retract(Job job, MediaPackage mediaPackage, String repository)
582 throws PublicationException, NotFoundException {
583 String mpId = mediaPackage.getIdentifier().toString();
584
585
586 MediaPackage oaiPmhMp = null;
587 SearchResult searchResult = oaiPmhDatabase.search(QueryBuilder.queryRepo(repository).mediaPackageId(mpId)
588 .isDeleted(false).build());
589 for (SearchResultItem searchResultItem : searchResult.getItems()) {
590 if (oaiPmhMp == null) {
591 oaiPmhMp = searchResultItem.getMediaPackage();
592 } else {
593 for (MediaPackageElement mpe : searchResultItem.getMediaPackage().getElements()) {
594 oaiPmhMp.add(mpe);
595 }
596 }
597 }
598
599
600 try {
601 oaiPmhDatabase.delete(mpId, repository);
602 } catch (OaiPmhDatabaseException e) {
603 throw new PublicationException(format("Unable to retract media package %s from OAI-PMH repository %s",
604 mpId, repository), e);
605 } catch (NotFoundException e) {
606 logger.debug("Skip retracting media package {} from OIA-PMH repository {} as it isn't published.",
607 mpId, repository, e);
608 }
609
610 if (oaiPmhMp != null && oaiPmhMp.getElements().length > 0) {
611
612 Set<String> mpeIds = new HashSet<>();
613 for (MediaPackageElement mpe : oaiPmhMp.elements()) {
614 if (MediaPackageElement.Type.Publication == mpe.getElementType()) {
615 continue;
616 }
617
618 mpeIds.add(mpe.getIdentifier());
619 }
620 if (!mpeIds.isEmpty()) {
621 List<Job> retractionJobs = new ArrayList<>();
622
623 try {
624 Job retractDownloadJob = downloadDistributionService
625 .retract(getPublicationChannelName(repository), oaiPmhMp, mpeIds);
626 if (retractDownloadJob != null) {
627 retractionJobs.add(retractDownloadJob);
628 }
629 } catch (DistributionException e) {
630 throw new PublicationException(
631 format(
632 "Unable to create retraction job from distribution channel download for the media package %s ",
633 mpId),
634 e);
635 }
636
637
638 try {
639 Job retractDownloadJob = streamingDistributionService
640 .retract(getPublicationChannelName(repository), oaiPmhMp, mpeIds);
641 if (retractDownloadJob != null) {
642 retractionJobs.add(retractDownloadJob);
643 }
644 } catch (DistributionException e) {
645 throw new PublicationException(
646 format("Unable to create retraction job from distribution channel streaming for the media package %s ",
647 mpId),
648 e);
649 }
650 if (retractionJobs.size() > 0) {
651
652 if (!waitForJobs(job, serviceRegistry, retractionJobs).isSuccess()) {
653 throw new PublicationException(
654 format("Unable to retract elements of media package %s from distribution channels.", mpId));
655 }
656 }
657 }
658 }
659
660 String publicationChannel = getPublicationChannelName(repository);
661 for (Publication p : mediaPackage.getPublications()) {
662 if (StringUtils.equals(publicationChannel, p.getChannel())) {
663 return p;
664 }
665 }
666 return null;
667 }
668
669 protected Publication updateMetadata(
670 Job job,
671 MediaPackage mediaPackage,
672 String repository,
673 Set<String> flavors,
674 Set<String> tags,
675 boolean checkAvailability
676 ) throws PublicationException {
677 final Set<MediaPackageElementFlavor> parsedFlavors = new HashSet<>();
678 for (String flavor : flavors) {
679 parsedFlavors.add(MediaPackageElementFlavor.parseFlavor(flavor));
680 }
681
682 final MediaPackage filteredMp;
683 final SearchResult result = oaiPmhDatabase.search(
684 QueryBuilder.queryRepo(repository)
685 .mediaPackageId(mediaPackage)
686 .isDeleted(false)
687 .build()
688 );
689 if (result.size() == 1) {
690
691 try {
692 logger.debug("filter elements with flavors {} and tags {} on media package {}",
693 StringUtils.join(flavors, ", "), StringUtils.join(tags, ", "),
694 MediaPackageParser.getAsXml(mediaPackage));
695
696 filteredMp = filterMediaPackage(mediaPackage, parsedFlavors, tags);
697 } catch (MediaPackageException e) {
698 throw new PublicationException("Error filtering media package", e);
699 }
700 } else if (result.size() == 0) {
701 logger.info("Skipping update of media package {} since it is not currently published to {}",
702 mediaPackage, repository);
703 return null;
704 } else {
705 final String msg = format(
706 "More than one media package with id %s found",
707 mediaPackage.getIdentifier().toString()
708 );
709 logger.warn(msg);
710 throw new PublicationException(msg);
711 }
712
713 Set<String> elementIdsToDistribute = new HashSet<>();
714 for (MediaPackageElement mpe : filteredMp.getElements()) {
715
716 if (MediaPackageElement.Type.Publication == mpe.getElementType()) {
717 continue;
718 }
719 elementIdsToDistribute.add(mpe.getIdentifier());
720 }
721 if (elementIdsToDistribute.isEmpty()) {
722 logger.debug("The media package {} does not contain any elements to update. "
723 + "Skip OAI-PMH metadata update operation for repository {}",
724 mediaPackage.getIdentifier().toString(), repository);
725 return null;
726 }
727 logger.debug("distribute elements {}", StringUtils.join(elementIdsToDistribute, ", "));
728 final List<MediaPackageElement> distributedElements = new ArrayList<>();
729 try {
730 Job distJob = downloadDistributionService
731 .distribute(getPublicationChannelName(repository), filteredMp, elementIdsToDistribute, checkAvailability);
732 if (job == null) {
733 throw new PublicationException("The distribution service can not handle this type of media package elements.");
734 }
735 if (!waitForJobs(job, serviceRegistry, distJob).isSuccess()) {
736 throw new PublicationException(format(
737 "Unable to distribute updated elements from media package %s to the download distribution service",
738 mediaPackage.getIdentifier().toString()));
739 }
740 if (distJob.getPayload() != null) {
741 for (MediaPackageElement mpe : MediaPackageElementParser.getArrayFromXml(distJob.getPayload())) {
742 distributedElements.add(mpe);
743 }
744 }
745 } catch (DistributionException | MediaPackageException e) {
746 throw new PublicationException(format(
747 "Unable to distribute updated elements from media package %s to the download distribution service",
748 mediaPackage.getIdentifier().toString()), e);
749 }
750
751
752 for (MediaPackageElement e : filteredMp.getElements()) {
753 if (MediaPackageElement.Type.Publication.equals(e.getElementType())) {
754 continue;
755 }
756 filteredMp.remove(e);
757 }
758 for (MediaPackageElement e : distributedElements) {
759 filteredMp.add(e);
760 }
761 MediaPackage publishedMp = merge(filteredMp, removeMatchingNonExistantElements(filteredMp,
762 (MediaPackage) result.getItems().get(0).getMediaPackage().clone(), parsedFlavors, tags));
763
764 try {
765 logger.debug("Updating metadata of media package {} in {}",
766 publishedMp.getIdentifier().toString(), repository);
767 oaiPmhDatabase.store(publishedMp, repository);
768 } catch (OaiPmhDatabaseException e) {
769 throw new PublicationException(format("Media package %s could not be updated",
770 publishedMp.getIdentifier().toString()));
771 }
772
773
774
775 Map<URI, MediaPackageElement> elementUriMap = new Hashtable<>();
776 for (SearchResultItem oaiPmhSearchResultItem : result.getItems()) {
777 for (MediaPackageElement mpe : oaiPmhSearchResultItem.getMediaPackage().getElements()) {
778 if (MediaPackageElement.Type.Publication == mpe.getElementType() || null == mpe.getURI()) {
779 continue;
780 }
781 elementUriMap.put(mpe.getURI(), mpe);
782 }
783 }
784 for (MediaPackageElement publishedMpe : publishedMp.getElements()) {
785 if (MediaPackageElement.Type.Publication == publishedMpe.getElementType()) {
786 continue;
787 }
788 if (elementUriMap.containsKey(publishedMpe.getURI())) {
789 elementUriMap.remove(publishedMpe.getURI());
790 }
791 }
792 Set<String> orphanedElementIds = new HashSet<>();
793 for (MediaPackageElement orphanedMpe : elementUriMap.values()) {
794 orphanedElementIds.add(orphanedMpe.getIdentifier());
795 }
796 if (!orphanedElementIds.isEmpty()) {
797 for (SearchResultItem oaiPmhSearchResultItem : result.getItems()) {
798 try {
799 Job retractJob = downloadDistributionService.retract(getPublicationChannelName(repository),
800 oaiPmhSearchResultItem.getMediaPackage(), orphanedElementIds);
801 if (retractJob != null) {
802 if (!waitForJobs(job, serviceRegistry, retractJob).isSuccess()) {
803 logger.warn(
804 "The download distribution retract job for the orphaned elements from media package {} "
805 + "does not end successfully",
806 oaiPmhSearchResultItem.getMediaPackage().getIdentifier().toString());
807 }
808 }
809 } catch (DistributionException e) {
810 logger.warn(
811 "Unable to retract orphaned elements from download distribution service for the "
812 + "media package {} channel {}",
813 oaiPmhSearchResultItem.getMediaPackage().getIdentifier().toString(),
814 getPublicationChannelName(repository),
815 e);
816 }
817 }
818 }
819
820
821 String publicationChannel = getPublicationChannelName(repository);
822 for (Publication p : mediaPackage.getPublications()) {
823 if (StringUtils.equals(publicationChannel, p.getChannel())) {
824 return p;
825 }
826 }
827 return null;
828 }
829
830 protected void checkInputArguments(MediaPackage mediaPackage, String repository) {
831 if (mediaPackage == null) {
832 throw new IllegalArgumentException("Media package must be specified");
833 }
834 if (StringUtils.isEmpty(repository)) {
835 throw new IllegalArgumentException("Repository must be specified");
836 }
837 if (!oaiPmhServerInfo.hasRepo(repository)) {
838 throw new IllegalArgumentException("OAI-PMH repository '" + repository + "' does not exist");
839 }
840 }
841
842 protected String getPublicationChannelName(String repository) {
843 return PUBLICATION_CHANNEL_PREFIX.concat(repository);
844 }
845
846
847 protected Publication createPublicationElement(String mpId, String repository) throws PublicationException {
848 for (String hostUrl : OaiPmhServerInfoUtil.oaiPmhServerUrlOfCurrentOrganization(securityService)) {
849 final URI engageUri = URIUtils.resolve(
850 URI.create(UrlSupport.concat(hostUrl, oaiPmhServerInfo.getMountPoint(), repository)),
851 "?verb=ListMetadataFormats&identifier=" + mpId);
852 return PublicationImpl.publication(UUID.randomUUID().toString(), getPublicationChannelName(repository), engageUri,
853 MimeTypes.parseMimeType(MimeTypes.XML.toString()));
854 }
855
856 final String msg = format("No host url for oai-pmh server configured for organization %s " + "("
857 + OaiPmhServerInfoUtil.ORG_CFG_OAIPMH_SERVER_HOSTURL + ")", securityService.getOrganization().getId());
858 throw new PublicationException(msg);
859 }
860
861 private static MediaPackage updateMediaPackageFields(MediaPackage oaiPmhMp, MediaPackage mediaPackage) {
862 oaiPmhMp.setTitle(mediaPackage.getTitle());
863 oaiPmhMp.setDate(mediaPackage.getDate());
864 oaiPmhMp.setLanguage(mediaPackage.getLanguage());
865 oaiPmhMp.setLicense(mediaPackage.getLicense());
866 oaiPmhMp.setSeries(mediaPackage.getSeries());
867 oaiPmhMp.setSeriesTitle(mediaPackage.getSeriesTitle());
868 for (String contributor : oaiPmhMp.getContributors()) {
869 oaiPmhMp.removeContributor(contributor);
870 }
871 for (String contributor : mediaPackage.getContributors()) {
872 oaiPmhMp.addContributor(contributor);
873 }
874 for (String creator : oaiPmhMp.getCreators()) {
875 oaiPmhMp.removeCreator(creator);
876 }
877 for (String creator : mediaPackage.getCreators()) {
878 oaiPmhMp.addCreator(creator);
879 }
880 for (String subject : oaiPmhMp.getSubjects()) {
881 oaiPmhMp.removeSubject(subject);
882 }
883 for (String subject : mediaPackage.getSubjects()) {
884 oaiPmhMp.addSubject(subject);
885 }
886 return oaiPmhMp;
887 }
888
889
890
891
892
893
894
895
896
897
898
899
900
901 private MediaPackage filterMediaPackage(
902 MediaPackage mediaPackage,
903 Set<MediaPackageElementFlavor> flavors,
904 Set<String> tags
905 ) throws MediaPackageException {
906 if (flavors.isEmpty() && tags.isEmpty()) {
907 throw new IllegalArgumentException("Flavors or tags parameter must be set");
908 }
909
910 MediaPackage filteredMediaPackage = (MediaPackage) mediaPackage.clone();
911
912
913 List<MediaPackageElement> keep = new ArrayList<>();
914
915 SimpleElementSelector selector = new SimpleElementSelector();
916
917 for (MediaPackageElementFlavor flavor : flavors) {
918 selector.addFlavor(flavor);
919 }
920 for (String tag : tags) {
921 selector.addTag(tag);
922 }
923 keep.addAll(selector.select(mediaPackage, true));
924
925
926 for (Publication p : filteredMediaPackage.getPublications()) {
927 keep.add(p);
928 }
929
930
931 for (MediaPackageElement element : filteredMediaPackage.getElements()) {
932
933 if (!keep.contains(element)) {
934 logger.debug("Removing {} '{}' from media package '{}'", element.getElementType().toString().toLowerCase(),
935 element.getIdentifier(), filteredMediaPackage.getIdentifier().toString());
936 filteredMediaPackage.remove(element);
937 continue;
938 }
939
940
941 MediaPackageReference reference = element.getReference();
942 if (reference != null) {
943 MediaPackageElement referencedElement = mediaPackage.getElementByReference(reference);
944
945
946 if (referencedElement != null && !keep.contains(referencedElement)) {
947
948
949 MediaPackageElement parent = null;
950 while ((parent = mediaPackage.getElementByReference(reference)) != null) {
951 if (parent.getFlavor() != null && element.getFlavor() == null) {
952 element.setFlavor(parent.getFlavor());
953 }
954 if (parent.getReference() == null) {
955 break;
956 }
957 reference = parent.getReference();
958 }
959
960
961 if (reference != null && reference.getType().equals(MediaPackageReference.TYPE_MEDIAPACKAGE)) {
962 element.setReference(reference);
963 } else {
964 element.clearReference();
965 }
966 }
967 }
968 }
969
970 return filteredMediaPackage;
971 }
972
973
974
975
976
977
978
979
980
981
982
983
984 public static MediaPackage removeMatchingNonExistantElements(MediaPackage updatedMp, MediaPackage publishedMp,
985 Set<MediaPackageElementFlavor> flavors, Set<String> tags) {
986 SimpleElementSelector selector = new SimpleElementSelector();
987
988 for (MediaPackageElementFlavor flavor : flavors) {
989 selector.addFlavor(flavor);
990 }
991 for (String tag : tags) {
992 selector.addTag(tag);
993 }
994 for (MediaPackageElement publishedMpe : selector.select(publishedMp, true)) {
995 boolean foundInUpdatedMp = false;
996 for (MediaPackageElement updatedMpe : updatedMp.getElementsByFlavor(publishedMpe.getFlavor())) {
997 if (!updatedMpe.containsTag(tags)) {
998
999 }
1000 foundInUpdatedMp = true;
1001 break;
1002 }
1003
1004 if (!foundInUpdatedMp) {
1005 publishedMp.remove(publishedMpe);
1006 }
1007 }
1008 return publishedMp;
1009 }
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024 public static MediaPackage merge(MediaPackage updatedMp, MediaPackage publishedMp) {
1025 if (publishedMp == null) {
1026 return updatedMp;
1027 }
1028
1029 final MediaPackage mergedMp = MediaPackageSupport.copy(publishedMp);
1030
1031
1032 for (final MediaPackageElement updatedElement : updatedMp.elements()) {
1033 for (final MediaPackageElementFlavor flavor : Opt.nul(updatedElement.getFlavor())) {
1034 for (final MediaPackageElement outdated : mergedMp.getElementsByFlavor(flavor)) {
1035 mergedMp.remove(outdated);
1036 }
1037 logger.debug("Update {} {} of type {}", updatedElement.getElementType().toString().toLowerCase(),
1038 updatedElement.getIdentifier(), updatedElement.getElementType());
1039 mergedMp.add(updatedElement);
1040 }
1041 }
1042
1043
1044 for (final Publication p : mergedMp.getPublications()) {
1045 mergedMp.remove(p);
1046 }
1047
1048
1049 for (final Publication updatedPublication : updatedMp.getPublications()) {
1050 mergedMp.add(updatedPublication);
1051 }
1052
1053
1054 updateMediaPackageFields(mergedMp, updatedMp);
1055 return mergedMp;
1056 }
1057
1058 @Override
1059 protected ServiceRegistry getServiceRegistry() {
1060 return serviceRegistry;
1061 }
1062
1063
1064
1065
1066
1067
1068 @Override
1069 protected SecurityService getSecurityService() {
1070 return securityService;
1071 }
1072
1073
1074
1075
1076
1077
1078 @Override
1079 protected UserDirectoryService getUserDirectoryService() {
1080 return userDirectoryService;
1081 }
1082
1083
1084
1085
1086
1087
1088 @Override
1089 protected OrganizationDirectoryService getOrganizationDirectoryService() {
1090 return organizationDirectoryService;
1091 }
1092
1093
1094 @Reference
1095 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
1096 this.serviceRegistry = serviceRegistry;
1097 }
1098
1099
1100 @Reference
1101 public void setSecurityService(SecurityService securityService) {
1102 this.securityService = securityService;
1103 }
1104
1105
1106 @Reference
1107 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
1108 this.userDirectoryService = userDirectoryService;
1109 }
1110
1111
1112 @Reference
1113 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
1114 this.organizationDirectoryService = organizationDirectoryService;
1115 }
1116
1117
1118 @Reference(target = "(distribution.channel=download)")
1119 public void setDownloadDistributionService(DownloadDistributionService downloadDistributionService) {
1120 this.downloadDistributionService = downloadDistributionService;
1121 }
1122
1123
1124 @Reference
1125 public void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
1126 this.streamingDistributionService = streamingDistributionService;
1127 }
1128
1129
1130 @Reference
1131 public void setOaiPmhServerInfo(OaiPmhServerInfo oaiPmhServerInfo) {
1132 this.oaiPmhServerInfo = oaiPmhServerInfo;
1133 }
1134
1135
1136 @Reference
1137 public void setOaiPmhDatabase(OaiPmhDatabase oaiPmhDatabase) {
1138 this.oaiPmhDatabase = oaiPmhDatabase;
1139 }
1140 }