1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.assetmanager.impl;
22
23 import org.opencastproject.assetmanager.api.AssetManager;
24 import org.opencastproject.assetmanager.api.AssetManagerException;
25 import org.opencastproject.assetmanager.api.Snapshot;
26 import org.opencastproject.assetmanager.api.Version;
27 import org.opencastproject.assetmanager.api.query.ARecord;
28 import org.opencastproject.assetmanager.api.query.RichAResult;
29 import org.opencastproject.assetmanager.api.storage.AssetStore;
30 import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
31 import org.opencastproject.job.api.AbstractJobProducer;
32 import org.opencastproject.job.api.Job;
33 import org.opencastproject.security.api.OrganizationDirectoryService;
34 import org.opencastproject.security.api.SecurityService;
35 import org.opencastproject.security.api.UserDirectoryService;
36 import org.opencastproject.serviceregistry.api.ServiceRegistry;
37 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
38 import org.opencastproject.util.NotFoundException;
39 import org.opencastproject.util.RequireUtil;
40
41 import com.google.gson.Gson;
42
43 import org.osgi.service.component.ComponentContext;
44 import org.osgi.service.component.annotations.Activate;
45 import org.osgi.service.component.annotations.Component;
46 import org.osgi.service.component.annotations.Reference;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 import java.util.Date;
51 import java.util.HashMap;
52 import java.util.LinkedList;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.Optional;
56 import java.util.function.Consumer;
57
58 @Component(
59 immediate = true,
60 service = AssetManagerJobProducer.class,
61 property = {
62 "service.description=Opencast Asset Manager Job Producer"
63 }
64 )
65 public class AssetManagerJobProducer extends AbstractJobProducer {
66
67
68 private static final Logger logger = LoggerFactory.getLogger(AssetManagerJobProducer.class);
69
70 public static final String JOB_TYPE = "org.opencastproject.assetmanager";
71 public static final Float JOB_LOAD = 0.1f;
72 public static final Float NONTERMINAL_JOB_LOAD = 0.1f;
73
74 public enum Operation {
75 MoveById, MoveByIdAndVersion, MoveByIdAndDate, MoveByDate, MoveRecords
76 }
77
78 private static final String OK = "OK";
79
80 private AssetManager tsam = null;
81 private ServiceRegistry serviceRegistry = null;
82 private SecurityService securityService = null;
83 private UserDirectoryService userDirectoryService = null;
84 private OrganizationDirectoryService organizationDirectoryService = null;
85
86 public AssetManagerJobProducer() {
87 super(JOB_TYPE);
88 }
89
90
91
92
93
94
95
96 @Override
97 @Activate
98 public void activate(ComponentContext cc) {
99 logger.info("Activating assetmanager job service");
100 super.activate(cc);
101 }
102
103 public boolean datastoreExists(String storeId) {
104 Optional<AssetStore> store = tsam.getAssetStore(storeId);
105 return store.isPresent();
106 }
107
108
109
110
111 private class MoveRecordInfo {
112 private final Gson gson = new Gson();
113 private int success = 0;
114 private int failed = 0;
115 private String currentMpId = "";
116 public void addSuccess() {
117 success++;
118 };
119 public void addFailed() {
120 failed ++;
121 }
122
123 public boolean isNewMpId(String mpId) {
124 if (currentMpId.equals(mpId)) {
125 return false;
126 }
127 currentMpId = mpId;
128 return true;
129 }
130
131 @Override
132 public String toString() {
133 Map<String,Integer> result = new HashMap<>();
134 if (success > 0) {
135 result.put("OK", success);
136 }
137 if (failed > 0) {
138 result.put("FAIL", failed);
139 }
140 return gson.toJson(result);
141 }
142 };
143
144 @Override
145 protected String process(Job job) throws ServiceRegistryException {
146 Operation op = null;
147 String operation = job.getOperation();
148 List<String> arguments = job.getArguments();
149 String id;
150 String targetStore = arguments.get(0);
151 VersionImpl version;
152 Date start;
153 Date end;
154 try {
155 op = Operation.valueOf(operation);
156 switch (op) {
157 case MoveById:
158 id = arguments.get(1);
159 return internalMoveById(id, targetStore);
160 case MoveByIdAndVersion:
161 id = arguments.get(1);
162 version = VersionImpl.mk(Long.parseLong(arguments.get(2)));
163 return internalMoveByIdAndVersion(version, id, targetStore);
164 case MoveByDate:
165 start = new Date(Long.parseLong(arguments.get(1)));
166 end = new Date(Long.parseLong(arguments.get(2)));
167 return internalMoveByDate(start, end, targetStore);
168 case MoveByIdAndDate:
169 id = arguments.get(1);
170 start = new Date(Long.parseLong(arguments.get(2)));
171 end = new Date(Long.parseLong(arguments.get(3)));
172 return internalMoveByIdAndDate(id, start, end, targetStore);
173 default:
174 throw new IllegalArgumentException("Unknown operation '" + operation + "'");
175 }
176 } catch (NotFoundException e) {
177 throw new ServiceRegistryException("Error running job", e);
178 } catch (Exception e) {
179 throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
180 }
181 }
182
183
184
185
186
187
188
189
190
191
192
193
194 public Job moveByIdAndVersion(final Version version, final String mpId, final String targetStorage) {
195 RequireUtil.notNull(version, "version");
196 RequireUtil.notEmpty(mpId, "mpId");
197 RequireUtil.notEmpty(targetStorage, "targetStorage");
198 List<String> args = new LinkedList<>();
199 args.add(targetStorage);
200 args.add(mpId);
201 args.add(version.toString());
202
203 try {
204 return serviceRegistry.createJob(JOB_TYPE, Operation.MoveByIdAndVersion.toString(), args, null, true, JOB_LOAD);
205 } catch (ServiceRegistryException e) {
206 throw new AssetManagerException("Unable to create a job", e);
207 }
208 }
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223 protected String internalMoveByIdAndVersion(
224 final Version version,
225 final String mpId,
226 final String targetStorage
227 ) throws NotFoundException {
228 tsam.moveSnapshotToStore(version, mpId, targetStorage);
229 return OK;
230 }
231
232
233
234
235
236
237
238
239
240
241
242 public Job moveById(final String mpId, final String targetStorage) {
243 RequireUtil.notEmpty(mpId, "mpId");
244 RequireUtil.notEmpty(targetStorage, "targetStorage");
245 List<String> args = new LinkedList<>();
246 args.add(targetStorage);
247 args.add(mpId);
248
249 try {
250 return serviceRegistry.createJob(JOB_TYPE, Operation.MoveById.toString(), args, null, true, NONTERMINAL_JOB_LOAD);
251 } catch (ServiceRegistryException e) {
252 throw new AssetManagerException("Unable to create a job", e);
253 }
254 }
255
256
257
258
259
260
261
262
263
264
265
266
267 protected String internalMoveById(final String mpId, final String targetStorage) {
268 RichAResult results = tsam.getSnapshotsByIdOrderedByVersion(mpId, true);
269 MoveRecordInfo result = moveSnapshots(results, targetStorage);
270 return result.toString();
271 }
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287 public Job moveByDate(final Date start, final Date end, final String targetStorage) {
288 RequireUtil.notNull(start, "start");
289 RequireUtil.notNull(end, "end");
290 RequireUtil.notNull(targetStorage, "targetStorage");
291 List<String> args = new LinkedList<>();
292 args.add(targetStorage);
293 args.add(Long.toString(start.getTime()));
294 args.add(Long.toString(end.getTime()));
295
296 try {
297 return serviceRegistry.createJob(
298 JOB_TYPE, Operation.MoveByDate.toString(), args, null, true, NONTERMINAL_JOB_LOAD);
299 } catch (ServiceRegistryException e) {
300 throw new AssetManagerException("Unable to create a job", e);
301 }
302 }
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317 protected String internalMoveByDate(final Date start, final Date end, final String targetStorage) {
318 RichAResult results = tsam.getSnapshotsByDateOrderedById(start, end);
319 List<Job> subjobs = spawnSubjobs(results, start, end, targetStorage);
320 return Integer.toString(subjobs.size());
321 }
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338 public Job moveByIdAndDate(final String mpId, final Date start, final Date end, final String targetStorage) {
339 RequireUtil.notNull(mpId, "mpId");
340 RequireUtil.notNull(start, "start");
341 RequireUtil.notNull(end, "end");
342 RequireUtil.notNull(targetStorage, "targetStorage");
343 List<String> args = new LinkedList<>();
344 args.add(targetStorage);
345 args.add(mpId);
346 args.add(Long.toString(start.getTime()));
347 args.add(Long.toString(end.getTime()));
348
349 try {
350 return serviceRegistry.createJob(
351 JOB_TYPE, Operation.MoveByIdAndDate.toString(), args, null, true, NONTERMINAL_JOB_LOAD);
352 } catch (ServiceRegistryException e) {
353 throw new AssetManagerException("Unable to create a job", e);
354 }
355 }
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372 protected String internalMoveByIdAndDate(
373 final String mpId,
374 final Date start,
375 final Date end,
376 final String targetStorage
377 ) {
378 RichAResult results = tsam.getSnapshotsByIdAndDateOrderedByVersion(mpId, start, end, true);
379 MoveRecordInfo result = moveSnapshots(results, targetStorage);
380 return result.toString();
381 }
382
383
384
385
386
387
388
389
390
391
392
393
394 private List<Job> spawnSubjobs(
395 final RichAResult records,
396 final Date start,
397 final Date end,
398 final String targetStorage
399 ) {
400 List<Job> jobs = new LinkedList<>();
401 MoveRecordInfo recordInfo = new MoveRecordInfo();
402 records.forEach(new Consumer<ARecord>() {
403 @Override
404 public void accept(ARecord record) {
405 Snapshot snap = record.getSnapshot().get();
406 String mediaPackageId = snap.getMediaPackage().getIdentifier().toString();
407 if (recordInfo.isNewMpId(mediaPackageId)) {
408 jobs.add(moveByIdAndDate(mediaPackageId,start,end,targetStorage));
409 }
410 }
411 });
412 return jobs;
413 }
414
415
416
417
418
419
420
421
422
423
424
425 private MoveRecordInfo moveSnapshots(final RichAResult records, final String targetStorage) {
426 final MoveRecordInfo result = new MoveRecordInfo();
427 records.forEach(new Consumer<ARecord>() {
428 @Override
429 public void accept(ARecord record) {
430 Snapshot snap = record.getSnapshot().get();
431 try {
432 logger.debug("moving Mediapackage {} Version {} from {} to {}",
433 snap.getMediaPackage().getIdentifier().toString(),
434 snap.getVersion().toString(),
435 snap.getStorageId(),
436 targetStorage
437 );
438 internalMoveByIdAndVersion(snap.getVersion(),
439 snap.getMediaPackage().getIdentifier().toString(),
440 targetStorage
441 );
442 result.addSuccess();
443 } catch (NotFoundException e) {
444 result.addFailed();
445 logger.warn(e.getMessage());
446 }
447 }
448 });
449 return result;
450 }
451
452 @Reference
453 protected void setServiceRegistry(ServiceRegistry serviceRegistry) {
454 this.serviceRegistry = serviceRegistry;
455 }
456
457 @Override
458 protected ServiceRegistry getServiceRegistry() {
459 return this.serviceRegistry;
460 }
461
462 @Reference
463 protected void setAssetManager(AssetManager assetManager) {
464 this.tsam = assetManager;
465 }
466
467 @Reference
468 protected void setSecurityService(SecurityService securityService) {
469 this.securityService = securityService;
470 }
471
472 @Override
473 protected SecurityService getSecurityService() {
474 return this.securityService;
475 }
476
477 @Reference
478 protected void setUserDirectoryService(UserDirectoryService uds) {
479 this.userDirectoryService = uds;
480 }
481
482 @Override
483 protected UserDirectoryService getUserDirectoryService() {
484 return this.userDirectoryService;
485 }
486
487 @Reference
488 protected void setOrganizationDirectoryService(OrganizationDirectoryService os) {
489 this.organizationDirectoryService = os;
490 }
491
492 @Override
493 protected OrganizationDirectoryService getOrganizationDirectoryService() {
494 return this.organizationDirectoryService;
495 }
496 }