1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.publication.configurable;
22
23 import org.opencastproject.distribution.api.DistributionException;
24 import org.opencastproject.distribution.api.DownloadDistributionService;
25 import org.opencastproject.job.api.AbstractJobProducer;
26 import org.opencastproject.job.api.Job;
27 import org.opencastproject.mediapackage.MediaPackage;
28 import org.opencastproject.mediapackage.MediaPackageElement;
29 import org.opencastproject.mediapackage.MediaPackageElementParser;
30 import org.opencastproject.mediapackage.MediaPackageException;
31 import org.opencastproject.mediapackage.MediaPackageParser;
32 import org.opencastproject.mediapackage.Publication;
33 import org.opencastproject.mediapackage.PublicationImpl;
34 import org.opencastproject.publication.api.ConfigurablePublicationService;
35 import org.opencastproject.publication.api.PublicationException;
36 import org.opencastproject.security.api.OrganizationDirectoryService;
37 import org.opencastproject.security.api.SecurityService;
38 import org.opencastproject.security.api.UserDirectoryService;
39 import org.opencastproject.serviceregistry.api.ServiceRegistry;
40 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
41 import org.opencastproject.util.JobUtil;
42
43 import com.google.gson.Gson;
44 import com.google.gson.reflect.TypeToken;
45
46 import org.osgi.service.component.ComponentContext;
47 import org.osgi.service.component.annotations.Component;
48 import org.osgi.service.component.annotations.Reference;
49
50 import java.util.Arrays;
51 import java.util.Collection;
52 import java.util.HashSet;
53 import java.util.List;
54 import java.util.Optional;
55 import java.util.Set;
56 import java.util.UUID;
57
58 @Component(
59 immediate = true,
60 service = ConfigurablePublicationService.class,
61 property = {
62 "service.description=Publication Service (Configurable)"
63 }
64 )
65 public class ConfigurablePublicationServiceImpl extends AbstractJobProducer implements ConfigurablePublicationService {
66
67
68 private Gson gson = new Gson();
69
70 @Override
71 public void activate(ComponentContext cc) {
72 super.activate(cc);
73 }
74
75 public ConfigurablePublicationServiceImpl() {
76 super(JOB_TYPE);
77 }
78
79 @Override
80 public String getJobType() {
81 return super.getJobType();
82 }
83
84 public enum Operation {
85 Replace
86 }
87
88 private DownloadDistributionService distributionService;
89
90 private SecurityService securityService;
91
92 private UserDirectoryService userDirectoryService;
93
94 private OrganizationDirectoryService organizationDirectoryService;
95
96 private ServiceRegistry serviceRegistry;
97
98 @Override
99 public Job replace(final MediaPackage mediaPackage, final String channelId,
100 final Collection<? extends MediaPackageElement> addElements, final Set<String> retractElementIds)
101 throws PublicationException, MediaPackageException {
102 try {
103 return serviceRegistry.createJob(JOB_TYPE, Operation.Replace.toString(),
104 Arrays.asList(MediaPackageParser.getAsXml(mediaPackage), channelId,
105 MediaPackageElementParser.getArrayAsXml(addElements), gson.toJson(retractElementIds)));
106 } catch (final ServiceRegistryException e) {
107 throw new PublicationException("Unable to create job", e);
108 }
109 }
110
111 @Override
112 public Publication replaceSync(
113 MediaPackage mediaPackage, String channelId, Collection<? extends MediaPackageElement> addElements,
114 Set<String> retractElementIds) throws PublicationException, MediaPackageException {
115 try {
116 return doReplaceSync(mediaPackage, channelId, addElements, retractElementIds);
117 } catch (DistributionException e) {
118 throw new PublicationException(e);
119 }
120 }
121
122 @Override
123 protected String process(final Job job) throws Exception {
124 final List<String> arguments = job.getArguments();
125 final MediaPackage mediaPackage = MediaPackageParser.getFromXml(arguments.get(0));
126 final String channelId = arguments.get(1);
127 final Collection<? extends MediaPackageElement> addElements = MediaPackageElementParser
128 .getArrayFromXml(arguments.get(2));
129 Set<String> retractElementIds = gson.fromJson(arguments.get(3), new TypeToken<Set<String>>() { }.getType());
130
131 Publication result = null;
132 switch (Operation.valueOf(job.getOperation())) {
133 case Replace:
134 result = doReplace(mediaPackage, channelId, addElements, retractElementIds);
135 break;
136 default:
137 break;
138 }
139 if (result != null) {
140 return MediaPackageElementParser.getAsXml(result);
141 } else {
142 return null;
143 }
144 }
145
146 private void distributeMany(final MediaPackage mp, final String channelId,
147 final Collection<? extends MediaPackageElement> elements)
148 throws DistributionException, MediaPackageException {
149
150 final Optional<Publication> publicationOpt = getPublication(mp, channelId);
151
152 if (publicationOpt.isPresent()) {
153
154 final Publication publication = publicationOpt.get();
155
156
157 elements.forEach(mp::add);
158
159 Set<String> elementIds = new HashSet<>();
160 for (final MediaPackageElement mpe : elements) {
161 elementIds.add(mpe.getIdentifier());
162 }
163
164 try {
165 Job job = distributionService.distribute(channelId, mp, elementIds, false);
166
167 if (!JobUtil.waitForJob(serviceRegistry, job).isSuccess()) {
168 throw new DistributionException("At least one of the publication jobs did not complete successfully");
169 }
170 List<? extends MediaPackageElement> distributedElements
171 = MediaPackageElementParser.getArrayFromXml(job.getPayload());
172 for (MediaPackageElement mpe : distributedElements) {
173 PublicationImpl.addElementToPublication(publication, mpe);
174 }
175 } finally {
176
177 elements.stream().map(MediaPackageElement::getIdentifier).forEach(mp::removeElementById);
178 }
179 }
180 }
181
182 private void distributeManySync(final MediaPackage mp, final String channelId,
183 final Collection<? extends MediaPackageElement> elements) throws DistributionException {
184
185 final Optional<Publication> publicationOpt = getPublication(mp, channelId);
186
187 if (publicationOpt.isPresent()) {
188
189 final Publication publication = publicationOpt.get();
190
191
192 elements.forEach(mp::add);
193
194 Set<String> elementIds = new HashSet<>();
195 for (final MediaPackageElement mpe : elements) {
196 elementIds.add(mpe.getIdentifier());
197 }
198
199 try {
200 List<? extends MediaPackageElement> distributedElements = distributionService.distributeSync(channelId, mp,
201 elementIds, false);
202 for (MediaPackageElement mpe : distributedElements) {
203 PublicationImpl.addElementToPublication(publication, mpe);
204 }
205 } finally {
206
207 elements.stream().map(MediaPackageElement::getIdentifier).forEach(mp::removeElementById);
208 }
209 }
210 }
211
212 private Publication doReplace(final MediaPackage mp, final String channelId,
213 final Collection<? extends MediaPackageElement> addElementIds, final Set<String> retractElementIds)
214 throws DistributionException, MediaPackageException {
215
216 final Job retractJob = distributionService.retract(channelId, mp, retractElementIds);
217
218 if (!JobUtil.waitForJobs(serviceRegistry, retractJob).isSuccess()) {
219 throw new DistributionException("At least one of the retraction jobs did not complete successfully");
220 }
221
222 final Optional<Publication> priorPublication = getPublication(mp, channelId);
223
224 final Publication publication;
225
226 if (priorPublication.isPresent()) {
227 publication = priorPublication.get();
228 } else {
229 final String publicationUUID = UUID.randomUUID().toString();
230 publication = PublicationImpl.publication(publicationUUID, channelId, null, null);
231 mp.add(publication);
232 }
233
234 retractElementIds.forEach(publication::removeAttachmentById);
235
236 distributeMany(mp, channelId, addElementIds);
237
238 return publication;
239 }
240
241 private Publication doReplaceSync(final MediaPackage mp, final String channelId,
242 final Collection<? extends MediaPackageElement> addElementIds, final Set<String> retractElementIds)
243 throws DistributionException {
244
245 distributionService.retractSync(channelId, mp, retractElementIds);
246
247 final Optional<Publication> priorPublication = getPublication(mp, channelId);
248
249 final Publication publication;
250
251 if (priorPublication.isPresent()) {
252 publication = priorPublication.get();
253 } else {
254 final String publicationUUID = UUID.randomUUID().toString();
255 publication = PublicationImpl.publication(publicationUUID, channelId, null, null);
256 mp.add(publication);
257 }
258
259 retractElementIds.forEach(publication::removeAttachmentById);
260
261 distributeManySync(mp, channelId, addElementIds);
262
263 return publication;
264 }
265
266 private Optional<Publication> getPublication(final MediaPackage mp, final String channelId) {
267 return Arrays.stream(mp.getPublications()).filter(p -> p.getChannel().equalsIgnoreCase(channelId)).findAny();
268 }
269
270 @Override
271 protected ServiceRegistry getServiceRegistry() {
272 return this.serviceRegistry;
273 }
274
275 @Override
276 protected SecurityService getSecurityService() {
277 return this.securityService;
278 }
279
280 @Override
281 protected UserDirectoryService getUserDirectoryService() {
282 return this.userDirectoryService;
283 }
284
285 @Override
286 protected OrganizationDirectoryService getOrganizationDirectoryService() {
287 return this.organizationDirectoryService;
288 }
289
290 @Reference
291 public void setServiceRegistry(ServiceRegistry serviceRegistry) {
292 this.serviceRegistry = serviceRegistry;
293 }
294
295 @Reference
296 public void setSecurityService(SecurityService securityService) {
297 this.serviceRegistry = serviceRegistry;
298 }
299
300 @Reference
301 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
302 this.userDirectoryService = userDirectoryService;
303 }
304
305 @Reference
306 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
307 this.organizationDirectoryService = organizationDirectoryService;
308 }
309
310 @Reference(target = "(distribution.channel=download)")
311 public void setDownloadDistributionService(DownloadDistributionService downloadDistributionService) {
312 this.distributionService = downloadDistributionService;
313 }
314
315 }