1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.workflow.handler.distribution;
22
23 import org.opencastproject.distribution.api.DistributionException;
24 import org.opencastproject.distribution.api.DownloadDistributionService;
25 import org.opencastproject.distribution.api.StreamingDistributionService;
26 import org.opencastproject.job.api.Job;
27 import org.opencastproject.mediapackage.Attachment;
28 import org.opencastproject.mediapackage.Catalog;
29 import org.opencastproject.mediapackage.MediaPackage;
30 import org.opencastproject.mediapackage.MediaPackageElement;
31 import org.opencastproject.mediapackage.Publication;
32 import org.opencastproject.mediapackage.Track;
33 import org.opencastproject.workflow.api.AbstractWorkflowOperationHandler;
34 import org.opencastproject.workflow.api.WorkflowOperationException;
35
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.HashSet;
42 import java.util.List;
43 import java.util.Set;
44 import java.util.stream.Collectors;
45
46
47
48
49
50
51
52
53 public abstract class ConfigurableWorkflowOperationHandlerBase extends AbstractWorkflowOperationHandler {
54
55 private static final Logger logger = LoggerFactory.getLogger(ConfigurableWorkflowOperationHandlerBase.class);
56
57 abstract DownloadDistributionService getDownloadDistributionService();
58 abstract StreamingDistributionService getStreamingDistributionService();
59
60
61
62
63
64
65
66
67
68
69 private void addPublicationElementsToMediaPackage(Publication publication, MediaPackage mp) {
70 assert ((publication != null) && (mp != null));
71 for (Attachment attachment : publication.getAttachments()) {
72 mp.add(attachment);
73 }
74 for (Catalog catalog : publication.getCatalogs()) {
75 mp.add(catalog);
76 }
77 for (Track track : publication.getTracks()) {
78 mp.add(track);
79 }
80 }
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 private int retractPublicationElements(
96 String channelId,
97 Publication publication,
98 MediaPackage mp,
99 boolean retractStreaming
100 ) throws WorkflowOperationException {
101 assert ((channelId != null) && (publication != null) && (mp != null));
102 MediaPackage mediapackageWithPublicationElements = (MediaPackage) mp.clone();
103
104
105 addPublicationElementsToMediaPackage(publication, mediapackageWithPublicationElements);
106
107 Set<String> elementIds = new HashSet<>();
108
109 for (Attachment attachment : publication.getAttachments()) {
110 elementIds.add(attachment.getIdentifier());
111 }
112 for (Catalog catalog : publication.getCatalogs()) {
113 elementIds.add(catalog.getIdentifier());
114 }
115 for (Track track : publication.getTracks()) {
116 elementIds.add(track.getIdentifier());
117 }
118
119 List<Job> jobs = new ArrayList<>();
120 if (elementIds.size() > 0) {
121 logger.info("Retracting {} elements of media package {} from publication channel {}", elementIds.size(), mp,
122 channelId);
123 try {
124 Job retractDownloadDistributionJob = getDownloadDistributionService()
125 .retract(channelId, mediapackageWithPublicationElements, elementIds);
126 if (retractDownloadDistributionJob != null) {
127 jobs.add(retractDownloadDistributionJob);
128 }
129 } catch (DistributionException e) {
130 logger.error("Error while retracting '{}' elements of media package {} from channel '{}' of distribution '{}'",
131 elementIds.size(), mp, channelId, getDownloadDistributionService(), e);
132 throw new WorkflowOperationException(e);
133 }
134
135 if (retractStreaming) {
136 try {
137 Job retractStreamingJob = getStreamingDistributionService()
138 .retract(channelId, mediapackageWithPublicationElements, elementIds);
139 if (retractStreamingJob != null) {
140 jobs.add(retractStreamingJob);
141 }
142 } catch (DistributionException e) {
143 logger.error(
144 "Error while retracting '{}' elements of media package {} from channel '{}' of distribution '{}'",
145 elementIds.size(), mp, channelId, getStreamingDistributionService(), e);
146 throw new WorkflowOperationException(e);
147 }
148 }
149
150 if (!waitForStatus(jobs.toArray(new Job[jobs.size()])).isSuccess()) {
151 throw new WorkflowOperationException("One of the retraction jobs did not complete successfully");
152 }
153 } else {
154 logger.debug("No publication elements were found for retraction");
155 }
156
157 return elementIds.size();
158 }
159
160 public List<Publication> getPublications(final MediaPackage mp, final String channelId) {
161 assert ((mp != null) && (channelId != null));
162 final List<Publication> publications = Arrays.stream(mp.getPublications())
163 .filter(a -> channelId.equals(a.getChannel()))
164 .collect(Collectors.toList());
165 assert (publications != null);
166 return publications;
167 }
168
169 public void retract(MediaPackage mp, final String channelId, boolean retractStreaming)
170 throws WorkflowOperationException {
171 assert ((mp != null) && (channelId != null));
172
173 final List<Publication> publications = getPublications(mp, channelId);
174
175 if (publications.size() > 0) {
176 int retractedElementsCount = 0;
177 for (Publication publication : publications) {
178 retractedElementsCount += retractPublicationElements(channelId, publication, mp, retractStreaming);
179 mp.remove(publication);
180 }
181 logger.info("Successfully retracted {} publications and retracted {} elements from publication channel '{}'",
182 publications.size(), retractedElementsCount, channelId);
183 } else {
184 logger.info("No publications for channel {} found for media package {}", channelId, mp.getIdentifier());
185 }
186 }
187
188 }