1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.assetmanager.impl;
22
23 import org.opencastproject.assetmanager.api.AssetManager;
24 import org.opencastproject.assetmanager.api.AssetManagerException;
25 import org.opencastproject.assetmanager.api.Snapshot;
26 import org.opencastproject.assetmanager.api.Version;
27 import org.opencastproject.assetmanager.api.storage.AssetStore;
28 import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
29 import org.opencastproject.job.api.AbstractJobProducer;
30 import org.opencastproject.job.api.Job;
31 import org.opencastproject.security.api.OrganizationDirectoryService;
32 import org.opencastproject.security.api.SecurityService;
33 import org.opencastproject.security.api.UserDirectoryService;
34 import org.opencastproject.serviceregistry.api.ServiceRegistry;
35 import org.opencastproject.serviceregistry.api.ServiceRegistryException;
36 import org.opencastproject.util.NotFoundException;
37 import org.opencastproject.util.RequireUtil;
38
39 import com.google.gson.Gson;
40
41 import org.osgi.service.component.ComponentContext;
42 import org.osgi.service.component.annotations.Activate;
43 import org.osgi.service.component.annotations.Component;
44 import org.osgi.service.component.annotations.Reference;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import java.util.Date;
49 import java.util.HashMap;
50 import java.util.LinkedList;
51 import java.util.List;
52 import java.util.Map;
53 import java.util.Optional;
54
55 @Component(
56 immediate = true,
57 service = AssetManagerJobProducer.class,
58 property = {
59 "service.description=Opencast Asset Manager Job Producer"
60 }
61 )
62 public class AssetManagerJobProducer extends AbstractJobProducer {
63
64
65 private static final Logger logger = LoggerFactory.getLogger(AssetManagerJobProducer.class);
66
67 public static final String JOB_TYPE = "org.opencastproject.assetmanager";
68 public static final Float JOB_LOAD = 0.1f;
69 public static final Float NONTERMINAL_JOB_LOAD = 0.1f;
70
71 public enum Operation {
72 MoveById, MoveByIdAndVersion, MoveByIdAndDate, MoveByDate, MoveRecords
73 }
74
75 private static final String OK = "OK";
76
77 private AssetManager tsam = null;
78 private ServiceRegistry serviceRegistry = null;
79 private SecurityService securityService = null;
80 private UserDirectoryService userDirectoryService = null;
81 private OrganizationDirectoryService organizationDirectoryService = null;
82
83 public AssetManagerJobProducer() {
84 super(JOB_TYPE);
85 }
86
87
88
89
90
91
92
93 @Override
94 @Activate
95 public void activate(ComponentContext cc) {
96 logger.info("Activating assetmanager job service");
97 super.activate(cc);
98 }
99
100 public boolean datastoreExists(String storeId) {
101 Optional<AssetStore> store = tsam.getAssetStore(storeId);
102 return store.isPresent();
103 }
104
105
106
107
108 private class MoveRecordInfo {
109 private final Gson gson = new Gson();
110 private int success = 0;
111 private int failed = 0;
112 private String currentMpId = "";
113 public void addSuccess() {
114 success++;
115 };
116 public void addFailed() {
117 failed ++;
118 }
119
120 public boolean isNewMpId(String mpId) {
121 if (currentMpId.equals(mpId)) {
122 return false;
123 }
124 currentMpId = mpId;
125 return true;
126 }
127
128 @Override
129 public String toString() {
130 Map<String,Integer> result = new HashMap<>();
131 if (success > 0) {
132 result.put("OK", success);
133 }
134 if (failed > 0) {
135 result.put("FAIL", failed);
136 }
137 return gson.toJson(result);
138 }
139 };
140
141 @Override
142 protected String process(Job job) throws ServiceRegistryException {
143 Operation op = null;
144 String operation = job.getOperation();
145 List<String> arguments = job.getArguments();
146 String id;
147 String targetStore = arguments.get(0);
148 VersionImpl version;
149 Date start;
150 Date end;
151 try {
152 op = Operation.valueOf(operation);
153 switch (op) {
154 case MoveById:
155 id = arguments.get(1);
156 return internalMoveById(id, targetStore);
157 case MoveByIdAndVersion:
158 id = arguments.get(1);
159 version = VersionImpl.mk(Long.parseLong(arguments.get(2)));
160 return internalMoveByIdAndVersion(version, id, targetStore);
161 case MoveByDate:
162 start = new Date(Long.parseLong(arguments.get(1)));
163 end = new Date(Long.parseLong(arguments.get(2)));
164 return internalMoveByDate(start, end, targetStore);
165 case MoveByIdAndDate:
166 id = arguments.get(1);
167 start = new Date(Long.parseLong(arguments.get(2)));
168 end = new Date(Long.parseLong(arguments.get(3)));
169 return internalMoveByIdAndDate(id, start, end, targetStore);
170 default:
171 throw new IllegalArgumentException("Unknown operation '" + operation + "'");
172 }
173 } catch (NotFoundException e) {
174 throw new ServiceRegistryException("Error running job", e);
175 } catch (Exception e) {
176 throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
177 }
178 }
179
180
181
182
183
184
185
186
187
188
189
190
191 public Job moveByIdAndVersion(final Version version, final String mpId, final String targetStorage) {
192 RequireUtil.notNull(version, "version");
193 RequireUtil.notEmpty(mpId, "mpId");
194 RequireUtil.notEmpty(targetStorage, "targetStorage");
195 List<String> args = new LinkedList<>();
196 args.add(targetStorage);
197 args.add(mpId);
198 args.add(version.toString());
199
200 try {
201 return serviceRegistry.createJob(JOB_TYPE, Operation.MoveByIdAndVersion.toString(), args, null, true, JOB_LOAD);
202 } catch (ServiceRegistryException e) {
203 throw new AssetManagerException("Unable to create a job", e);
204 }
205 }
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220 protected String internalMoveByIdAndVersion(
221 final Version version,
222 final String mpId,
223 final String targetStorage
224 ) throws NotFoundException {
225 tsam.moveSnapshotToStore(version, mpId, targetStorage);
226 return OK;
227 }
228
229
230
231
232
233
234
235
236
237
238
239 public Job moveById(final String mpId, final String targetStorage) {
240 RequireUtil.notEmpty(mpId, "mpId");
241 RequireUtil.notEmpty(targetStorage, "targetStorage");
242 List<String> args = new LinkedList<>();
243 args.add(targetStorage);
244 args.add(mpId);
245
246 try {
247 return serviceRegistry.createJob(JOB_TYPE, Operation.MoveById.toString(), args, null, true, NONTERMINAL_JOB_LOAD);
248 } catch (ServiceRegistryException e) {
249 throw new AssetManagerException("Unable to create a job", e);
250 }
251 }
252
253
254
255
256
257
258
259
260
261
262
263
264 protected String internalMoveById(final String mpId, final String targetStorage) {
265 List<Snapshot> results = tsam.getSnapshotsByIdOrderedByVersion(mpId, true);
266 MoveRecordInfo result = moveSnapshots(results, targetStorage);
267 return result.toString();
268 }
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284 public Job moveByDate(final Date start, final Date end, final String targetStorage) {
285 RequireUtil.notNull(start, "start");
286 RequireUtil.notNull(end, "end");
287 RequireUtil.notNull(targetStorage, "targetStorage");
288 List<String> args = new LinkedList<>();
289 args.add(targetStorage);
290 args.add(Long.toString(start.getTime()));
291 args.add(Long.toString(end.getTime()));
292
293 try {
294 return serviceRegistry.createJob(
295 JOB_TYPE, Operation.MoveByDate.toString(), args, null, true, NONTERMINAL_JOB_LOAD);
296 } catch (ServiceRegistryException e) {
297 throw new AssetManagerException("Unable to create a job", e);
298 }
299 }
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314 protected String internalMoveByDate(final Date start, final Date end, final String targetStorage) {
315 List<Snapshot> snapshots = tsam.getSnapshotsByDateOrderedById(start, end);
316 List<Job> subjobs = spawnSubjobs(snapshots, start, end, targetStorage);
317 return Integer.toString(subjobs.size());
318 }
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335 public Job moveByIdAndDate(final String mpId, final Date start, final Date end, final String targetStorage) {
336 RequireUtil.notNull(mpId, "mpId");
337 RequireUtil.notNull(start, "start");
338 RequireUtil.notNull(end, "end");
339 RequireUtil.notNull(targetStorage, "targetStorage");
340 List<String> args = new LinkedList<>();
341 args.add(targetStorage);
342 args.add(mpId);
343 args.add(Long.toString(start.getTime()));
344 args.add(Long.toString(end.getTime()));
345
346 try {
347 return serviceRegistry.createJob(
348 JOB_TYPE, Operation.MoveByIdAndDate.toString(), args, null, true, NONTERMINAL_JOB_LOAD);
349 } catch (ServiceRegistryException e) {
350 throw new AssetManagerException("Unable to create a job", e);
351 }
352 }
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369 protected String internalMoveByIdAndDate(
370 final String mpId,
371 final Date start,
372 final Date end,
373 final String targetStorage
374 ) {
375 List<Snapshot> snapshots = tsam.getSnapshotsByIdAndDateOrderedByVersion(mpId, start, end, true);
376 MoveRecordInfo result = moveSnapshots(snapshots, targetStorage);
377 return result.toString();
378 }
379
380
381
382
383
384
385
386
387
388
389
390
391 private List<Job> spawnSubjobs(
392 final List<Snapshot> snapshots,
393 final Date start,
394 final Date end,
395 final String targetStorage
396 ) {
397 List<Job> jobs = new LinkedList<>();
398 MoveRecordInfo recordInfo = new MoveRecordInfo();
399 snapshots.forEach(snap -> {
400 String mediaPackageId = snap.getMediaPackage().getIdentifier().toString();
401 if (recordInfo.isNewMpId(mediaPackageId)) {
402 jobs.add(moveByIdAndDate(mediaPackageId,start,end,targetStorage));
403 }
404 });
405 return jobs;
406 }
407
408
409
410
411
412
413
414
415
416
417
418 private MoveRecordInfo moveSnapshots(final List<Snapshot> snapshots, final String targetStorage) {
419 final MoveRecordInfo result = new MoveRecordInfo();
420 snapshots.forEach(snap -> {
421 try {
422 logger.debug("moving Mediapackage {} Version {} from {} to {}",
423 snap.getMediaPackage().getIdentifier().toString(),
424 snap.getVersion().toString(),
425 snap.getStorageId(),
426 targetStorage
427 );
428 internalMoveByIdAndVersion(snap.getVersion(),
429 snap.getMediaPackage().getIdentifier().toString(),
430 targetStorage
431 );
432 result.addSuccess();
433 } catch (NotFoundException e) {
434 result.addFailed();
435 logger.warn(e.getMessage());
436 }
437 });
438 return result;
439 }
440
441 @Reference
442 protected void setServiceRegistry(ServiceRegistry serviceRegistry) {
443 this.serviceRegistry = serviceRegistry;
444 }
445
446 @Override
447 protected ServiceRegistry getServiceRegistry() {
448 return this.serviceRegistry;
449 }
450
451 @Reference
452 protected void setAssetManager(AssetManager assetManager) {
453 this.tsam = assetManager;
454 }
455
456 @Reference
457 protected void setSecurityService(SecurityService securityService) {
458 this.securityService = securityService;
459 }
460
461 @Override
462 protected SecurityService getSecurityService() {
463 return this.securityService;
464 }
465
466 @Reference
467 protected void setUserDirectoryService(UserDirectoryService uds) {
468 this.userDirectoryService = uds;
469 }
470
471 @Override
472 protected UserDirectoryService getUserDirectoryService() {
473 return this.userDirectoryService;
474 }
475
476 @Reference
477 protected void setOrganizationDirectoryService(OrganizationDirectoryService os) {
478 this.organizationDirectoryService = os;
479 }
480
481 @Override
482 protected OrganizationDirectoryService getOrganizationDirectoryService() {
483 return this.organizationDirectoryService;
484 }
485 }