View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.workflow.handler.distribution;
22  
23  import org.opencastproject.distribution.api.DistributionException;
24  import org.opencastproject.distribution.api.DownloadDistributionService;
25  import org.opencastproject.distribution.api.StreamingDistributionService;
26  import org.opencastproject.job.api.Job;
27  import org.opencastproject.mediapackage.Attachment;
28  import org.opencastproject.mediapackage.Catalog;
29  import org.opencastproject.mediapackage.MediaPackage;
30  import org.opencastproject.mediapackage.MediaPackageElement;
31  import org.opencastproject.mediapackage.Publication;
32  import org.opencastproject.mediapackage.Track;
33  import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
34  import org.opencastproject.workflow.api.WorkflowOperationException;
35  
36  import org.slf4j.Logger;
37  import org.slf4j.LoggerFactory;
38  
39  import java.util.ArrayList;
40  import java.util.Arrays;
41  import java.util.HashSet;
42  import java.util.List;
43  import java.util.Set;
44  import java.util.stream.Collectors;
45  
46  /**
47   * Abstract base class of ConfigurablePublishWorkflerOperationHandler and ConfigurableRectractWorkflowOperationHandler.
48   *
49   * Both the ConfigurablePublishWorkflowOperationHandler and ConfigurableRetractWorkflowOperationHanlder are capable of
50   * retracting publications created by the ConfigurablePublishWorkflowOperationHandler.
51   * To avoid code duplication, this commonly used functionaly has been factored out into this class.
52   */
53  public abstract class ConfigurableWorkflowOperationHandlerBase extends AbstractWorkflowOperationHandler {
54  
55    private static final Logger logger = LoggerFactory.getLogger(ConfigurableWorkflowOperationHandlerBase.class);
56  
57    abstract DownloadDistributionService getDownloadDistributionService();
58    abstract StreamingDistributionService getStreamingDistributionService();
59  
60    /**
61     * Adds all of the {@link Publication}'s {@link MediaPackageElement}s that would normally have not been in the
62     * {@link MediaPackage}.
63     *
64     * @param publication
65     *          The {@link Publication} with the {@link MediaPackageElement}s to add.
66     * @param mp
67     *          The {@link MediaPackage} to add the {@link MediaPackageElement}s to.
68     */
69    private void addPublicationElementsToMediaPackage(Publication publication, MediaPackage mp) {
70      assert ((publication != null) && (mp != null));
71      for (Attachment attachment : publication.getAttachments()) {
72        mp.add(attachment);
73      }
74      for (Catalog catalog : publication.getCatalogs()) {
75        mp.add(catalog);
76      }
77      for (Track track : publication.getTracks()) {
78        mp.add(track);
79      }
80    }
81  
82    /**
83     * Remove the {@link Publication}'s {@link MediaPackageElement}s from a given channel.
84     *
85     * @param channelId
86     *          The channel to remove the {@link MediaPackageElement}s from.
87     * @param publication
88     *          The {@link Publication} that is being removed.
89     * @param mp
90     *          The {@link MediaPackage} that the {@link Publication} is part of.
91     * @return the number of {@link MediaPackageElement}s that have been retracted
92     * @throws WorkflowOperationException
93     *           Thrown if unable to retract the {@link MediaPackageElement}s.
94     */
95    private int retractPublicationElements(
96        String channelId,
97        Publication publication,
98        MediaPackage mp,
99        boolean retractStreaming
100   ) throws WorkflowOperationException {
101     assert ((channelId != null) && (publication != null) && (mp != null));
102     MediaPackage mediapackageWithPublicationElements = (MediaPackage) mp.clone();
103 
104     // Add the publications to the mediapackage so that we can use the standard retract
105     addPublicationElementsToMediaPackage(publication, mediapackageWithPublicationElements);
106 
107     Set<String> elementIds = new HashSet<>();
108 
109     for (Attachment attachment : publication.getAttachments()) {
110       elementIds.add(attachment.getIdentifier());
111     }
112     for (Catalog catalog : publication.getCatalogs()) {
113       elementIds.add(catalog.getIdentifier());
114     }
115     for (Track track : publication.getTracks()) {
116       elementIds.add(track.getIdentifier());
117     }
118 
119     List<Job> jobs = new ArrayList<>();
120     if (elementIds.size() > 0) {
121       logger.info("Retracting {} elements of media package {} from publication channel {}", elementIds.size(), mp,
122               channelId);
123       try {
124         Job retractDownloadDistributionJob = getDownloadDistributionService()
125             .retract(channelId, mediapackageWithPublicationElements, elementIds);
126         if (retractDownloadDistributionJob != null) {
127           jobs.add(retractDownloadDistributionJob);
128         }
129       } catch (DistributionException e) {
130         logger.error("Error while retracting '{}' elements of media package {} from channel '{}' of distribution '{}'",
131                 elementIds.size(), mp, channelId, getDownloadDistributionService(), e);
132         throw new WorkflowOperationException(e);
133       }
134 
135       if (retractStreaming) {
136         try {
137           Job retractStreamingJob = getStreamingDistributionService()
138               .retract(channelId, mediapackageWithPublicationElements, elementIds);
139           if (retractStreamingJob != null) {
140             jobs.add(retractStreamingJob);
141           }
142         } catch (DistributionException e) {
143           logger.error(
144                   "Error while retracting '{}' elements of media package {} from channel '{}' of distribution '{}'",
145                   elementIds.size(), mp, channelId, getStreamingDistributionService(), e);
146           throw new WorkflowOperationException(e);
147         }
148       }
149 
150       if (!waitForStatus(jobs.toArray(new Job[jobs.size()])).isSuccess()) {
151         throw new WorkflowOperationException("One of the retraction jobs did not complete successfully");
152       }
153     } else {
154       logger.debug("No publication elements were found for retraction");
155     }
156 
157     return elementIds.size();
158   }
159 
160   public List<Publication> getPublications(final MediaPackage mp, final String channelId) {
161     assert ((mp != null) && (channelId != null));
162     final List<Publication> publications = Arrays.stream(mp.getPublications())
163         .filter(a -> channelId.equals(a.getChannel()))
164         .collect(Collectors.toList());
165     assert (publications != null);
166     return publications;
167   }
168 
169   public void retract(MediaPackage mp, final String channelId, boolean retractStreaming)
170           throws WorkflowOperationException {
171     assert ((mp != null) && (channelId != null));
172 
173     final List<Publication> publications = getPublications(mp, channelId);
174 
175     if (publications.size() > 0) {
176       int retractedElementsCount = 0;
177       for (Publication publication : publications) {
178         retractedElementsCount += retractPublicationElements(channelId, publication, mp, retractStreaming);
179         mp.remove(publication);
180       }
181       logger.info("Successfully retracted {} publications and retracted {} elements from publication channel '{}'",
182               publications.size(), retractedElementsCount, channelId);
183     } else {
184       logger.info("No publications for channel {} found for media package {}", channelId, mp.getIdentifier());
185     }
186   }
187 
188 }