View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.workflow.handler.distribution;
23  
24  import static org.opencastproject.workflow.handler.distribution.EngagePublicationChannel.CHANNEL_ID;
25  
26  import org.opencastproject.distribution.api.DistributionException;
27  import org.opencastproject.distribution.api.DownloadDistributionService;
28  import org.opencastproject.distribution.api.StreamingDistributionService;
29  import org.opencastproject.job.api.Job;
30  import org.opencastproject.job.api.JobContext;
31  import org.opencastproject.mediapackage.MediaPackage;
32  import org.opencastproject.mediapackage.MediaPackageElement;
33  import org.opencastproject.mediapackage.Publication;
34  import org.opencastproject.search.api.SearchService;
35  import org.opencastproject.serviceregistry.api.ServiceRegistry;
36  import org.opencastproject.util.NotFoundException;
37  import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
38  import org.opencastproject.workflow.api.WorkflowInstance;
39  import org.opencastproject.workflow.api.WorkflowOperationException;
40  import org.opencastproject.workflow.api.WorkflowOperationHandler;
41  import org.opencastproject.workflow.api.WorkflowOperationResult;
42  import org.opencastproject.workflow.api.WorkflowOperationResult.Action;
43  
44  import org.osgi.service.component.ComponentContext;
45  import org.osgi.service.component.annotations.Activate;
46  import org.osgi.service.component.annotations.Component;
47  import org.osgi.service.component.annotations.Reference;
48  import org.slf4j.Logger;
49  import org.slf4j.LoggerFactory;
50  
51  import java.util.ArrayList;
52  import java.util.Arrays;
53  import java.util.Collections;
54  import java.util.List;
55  import java.util.Set;
56  import java.util.stream.Collectors;
57  
58  /**
59   * Workflow operation for retracting a media package from the engage player.
60   */
61  @Component(
62      immediate = true,
63      service = WorkflowOperationHandler.class,
64      property = {
65          "service.description=Engage Retraction Workflow Operation Handler",
66          "workflow.operation=retract-engage"
67      }
68  )
69  public class RetractEngageWorkflowOperationHandler extends AbstractWorkflowOperationHandler {
70  
71    /** The logging facility */
72    private static final Logger logger = LoggerFactory.getLogger(RetractEngageWorkflowOperationHandler.class);
73  
74    /** The streaming distribution service */
75    protected StreamingDistributionService streamingDistributionService = null;
76  
77    /** The download distribution service */
78    protected DownloadDistributionService downloadDistributionService = null;
79  
80    /** The search service */
81    protected SearchService searchService = null;
82  
83    /**
84     * Callback for the OSGi declarative services configuration.
85     *
86     * @param streamingDistributionService
87     *          the streaming distribution service
88     */
89    @Reference(target = "(distribution.channel=streaming)")
90    protected void setStreamingDistributionService(StreamingDistributionService streamingDistributionService) {
91      this.streamingDistributionService = streamingDistributionService;
92    }
93  
94    /**
95     * Callback for the OSGi declarative services configuration.
96     *
97     * @param downloadDistributionService
98     *          the download distribution service
99     */
100 
101   @Reference(target = "(distribution.channel=download)")
102   protected void setDownloadDistributionService(DownloadDistributionService downloadDistributionService) {
103     this.downloadDistributionService = downloadDistributionService;
104   }
105 
106   /**
107    * Callback for declarative services configuration that will introduce us to the search service. Implementation
108    * assumes that the reference is configured as being static.
109    *
110    * @param searchService
111    *          an instance of the search service
112    */
113 
114   @Reference
115   protected void setSearchService(SearchService searchService) {
116     this.searchService = searchService;
117   }
118 
119   @Reference
120   @Override  public void setServiceRegistry(ServiceRegistry serviceRegistry) {
121     super.setServiceRegistry(serviceRegistry);
122   }
123 
124   @Override
125   @Activate
126   protected void activate(ComponentContext cc) {
127     super.activate(cc);
128   }
129 
130   /**
131    * Generate the jobs retracted the selected elements
132    * @param retractElementIds The list of element ids to retract
133    * @param searchMediaPackage The mediapackage from the search service
134    * @return
135    * @throws DistributionException
136    */
137   protected List<Job> retractElements(Set<String> retractElementIds, MediaPackage searchMediaPackage) throws
138           DistributionException {
139     if (retractElementIds.isEmpty()) {
140       return Collections.emptyList();
141     }
142 
143     List<Job> jobs = new ArrayList<>();
144 
145     jobs.add(downloadDistributionService.retract(CHANNEL_ID, searchMediaPackage, retractElementIds));
146 
147     if (streamingDistributionService != null && streamingDistributionService.publishToStreaming()) {
148       jobs.add(streamingDistributionService.retract(CHANNEL_ID, searchMediaPackage, retractElementIds));
149     }
150 
151     return jobs;
152   }
153 
154   /**
155    * {@inheritDoc}
156    *
157    * @see org.opencastproject.workflow.api.WorkflowOperationHandler#start(WorkflowInstance, JobContext)
158    */
159   @Override
160   public WorkflowOperationResult start(WorkflowInstance workflowInstance, JobContext context)
161           throws WorkflowOperationException {
162     MediaPackage mediaPackage = workflowInstance.getMediaPackage();
163     List<Job> jobs;
164     try {
165       MediaPackage searchMediaPackage = null;
166       try {
167         searchMediaPackage = searchService.get(mediaPackage.getIdentifier().toString());
168       } catch (NotFoundException e) {
169         logger.info("The search service doesn't know media package {}", mediaPackage);
170         return createResult(mediaPackage, Action.SKIP);
171       }
172       logger.info("Retracting media package {} from download/streaming distribution channel", searchMediaPackage);
173       var retractElementIds = Arrays.stream(searchMediaPackage.getElements())
174           .map(MediaPackageElement::getIdentifier)
175           .collect(Collectors.toSet());
176       jobs = retractElements(retractElementIds, searchMediaPackage);
177 
178       // Wait for retraction to finish
179       if (!waitForStatus(jobs.toArray(new Job[0])).isSuccess()) {
180         throw new WorkflowOperationException("One of the download/streaming retract job did not complete successfully");
181       }
182 
183       logger.debug("Retraction operation complete");
184 
185       logger.info("Removing media package {} from the search index", mediaPackage);
186       Job deleteFromSearch = searchService.delete(mediaPackage.getIdentifier().toString());
187       if (!waitForStatus(deleteFromSearch).isSuccess()) {
188         throw new WorkflowOperationException("Removing media package from search did not complete successfully");
189       }
190 
191       logger.debug("Remove from search operation complete");
192 
193       // Remove publication element
194       logger.info("Removing engage publication element from media package {}", mediaPackage);
195       Publication[] publications = mediaPackage.getPublications();
196       for (Publication publication : publications) {
197         if (CHANNEL_ID.equals(publication.getChannel())) {
198           mediaPackage.remove(publication);
199           logger.debug("Remove engage publication element '{}' complete", publication);
200         }
201       }
202 
203       return createResult(mediaPackage, Action.CONTINUE);
204     } catch (Throwable t) {
205       throw new WorkflowOperationException(t);
206     }
207   }
208 
209 }