View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.assetmanager.impl;
22  
23  import org.opencastproject.assetmanager.api.AssetManager;
24  import org.opencastproject.assetmanager.api.AssetManagerException;
25  import org.opencastproject.assetmanager.api.Snapshot;
26  import org.opencastproject.assetmanager.api.Version;
27  import org.opencastproject.assetmanager.api.query.ARecord;
28  import org.opencastproject.assetmanager.api.query.RichAResult;
29  import org.opencastproject.assetmanager.api.storage.AssetStore;
30  import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
31  import org.opencastproject.job.api.AbstractJobProducer;
32  import org.opencastproject.job.api.Job;
33  import org.opencastproject.security.api.OrganizationDirectoryService;
34  import org.opencastproject.security.api.SecurityService;
35  import org.opencastproject.security.api.UserDirectoryService;
36  import org.opencastproject.serviceregistry.api.ServiceRegistry;
37  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
38  import org.opencastproject.util.NotFoundException;
39  import org.opencastproject.util.RequireUtil;
40  
41  import com.google.gson.Gson;
42  
43  import org.osgi.service.component.ComponentContext;
44  import org.osgi.service.component.annotations.Activate;
45  import org.osgi.service.component.annotations.Component;
46  import org.osgi.service.component.annotations.Reference;
47  import org.slf4j.Logger;
48  import org.slf4j.LoggerFactory;
49  
50  import java.util.Date;
51  import java.util.HashMap;
52  import java.util.LinkedList;
53  import java.util.List;
54  import java.util.Map;
55  import java.util.Optional;
56  import java.util.function.Consumer;
57  
58  @Component(
59      immediate = true,
60      service = AssetManagerJobProducer.class,
61      property = {
62          "service.description=Opencast Asset Manager Job Producer"
63      }
64  )
65  public class AssetManagerJobProducer extends AbstractJobProducer {
66  
67    /** The logging facility */
68    private static final Logger logger = LoggerFactory.getLogger(AssetManagerJobProducer.class);
69  
70    public static final String JOB_TYPE = "org.opencastproject.assetmanager";
71    public static final Float JOB_LOAD = 0.1f;
72    public static final Float NONTERMINAL_JOB_LOAD = 0.1f;
73  
74    public enum Operation {
75      MoveById, MoveByIdAndVersion, MoveByIdAndDate, MoveByDate, MoveRecords
76    }
77  
78    private static final String OK = "OK";
79  
80    private AssetManager tsam = null;
81    private ServiceRegistry serviceRegistry = null;
82    private SecurityService securityService = null;
83    private UserDirectoryService userDirectoryService = null;
84    private OrganizationDirectoryService organizationDirectoryService = null;
85  
86    public AssetManagerJobProducer() {
87      super(JOB_TYPE);
88    }
89  
90    /**
91     * OSGi callback on component activation.
92     *
93     * @param cc
94     *          the component context
95     */
96    @Override
97    @Activate
98    public void activate(ComponentContext cc) {
99      logger.info("Activating assetmanager job service");
100     super.activate(cc);
101   }
102 
103   public boolean datastoreExists(String storeId) {
104     Optional<AssetStore> store = tsam.getAssetStore(storeId);
105     return store.isPresent();
106   }
107 
108   /** Utility class to collect RecordInformation for moving larger 
109    * groups of mediapackages in combined jobs.
110    */
111   private class MoveRecordInfo {
112     private final Gson gson = new Gson();
113     private int success = 0;
114     private int failed = 0;
115     private String currentMpId = "";
116     public void addSuccess() {
117       success++;
118     };
119     public void addFailed() {
120       failed ++;
121     }
122 
123     public boolean isNewMpId(String mpId) {
124       if (currentMpId.equals(mpId)) {
125         return false;
126       }
127       currentMpId = mpId;
128       return true;
129     }
130 
131     @Override
132     public String toString() {
133       Map<String,Integer> result = new HashMap<>();
134       if (success > 0) {
135         result.put("OK", success);
136       }
137       if (failed > 0) {
138         result.put("FAIL", failed);
139       }
140       return gson.toJson(result);
141     }
142   };
143 
144   @Override
145   protected String process(Job job) throws ServiceRegistryException {
146     Operation op = null;
147     String operation = job.getOperation();
148     List<String> arguments = job.getArguments();
149     String id;
150     String targetStore = arguments.get(0);
151     VersionImpl version;
152     Date start;
153     Date end;
154     try {
155       op = Operation.valueOf(operation);
156       switch (op) {
157         case MoveById:
158           id = arguments.get(1);
159           return internalMoveById(id, targetStore);
160         case MoveByIdAndVersion:
161           id = arguments.get(1);
162           version = VersionImpl.mk(Long.parseLong(arguments.get(2)));
163           return internalMoveByIdAndVersion(version, id, targetStore);
164         case MoveByDate:
165           start = new Date(Long.parseLong(arguments.get(1)));
166           end = new Date(Long.parseLong(arguments.get(2)));
167           return internalMoveByDate(start, end, targetStore);
168         case MoveByIdAndDate:
169           id = arguments.get(1);
170           start = new Date(Long.parseLong(arguments.get(2)));
171           end = new Date(Long.parseLong(arguments.get(3)));
172           return internalMoveByIdAndDate(id, start, end, targetStore);
173         default:
174           throw new IllegalArgumentException("Unknown operation '" + operation + "'");
175       }
176     } catch (NotFoundException e) {
177       throw new ServiceRegistryException("Error running job", e);
178     } catch (Exception e) {
179       throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
180     }
181   }
182 
183   /**
184    * Spawns a job to move a single snapshot from its current storage to a new target storage location
185    *
186    * @param version
187    *  The {@link Version} to move
188    * @param mpId
189    *  The mediapackage ID of the snapshot to move
190    * @param targetStorage
191    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
192    * @return
193    */
194   public Job moveByIdAndVersion(final Version version, final String mpId, final String targetStorage) {
195     RequireUtil.notNull(version, "version");
196     RequireUtil.notEmpty(mpId, "mpId");
197     RequireUtil.notEmpty(targetStorage, "targetStorage");
198     List<String> args = new LinkedList<>();
199     args.add(targetStorage);
200     args.add(mpId);
201     args.add(version.toString());
202 
203     try {
204       return serviceRegistry.createJob(JOB_TYPE, Operation.MoveByIdAndVersion.toString(), args, null, true, JOB_LOAD);
205     } catch (ServiceRegistryException e) {
206       throw new AssetManagerException("Unable to create a job", e);
207     }
208   }
209 
210   /**
211    * Triggers the move operation inside the {@link AssetManager}
212    *
213    * @param version
214    *  The {@link Version} to move
215    * @param mpId
216    *  The mediapackage ID of the snapshot to move
217    * @param targetStorage
218    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
219    * @return
220    *  The string "OK"
221    * @throws NotFoundException
222    */
223   protected String internalMoveByIdAndVersion(
224       final Version version,
225       final String mpId,
226       final String targetStorage
227   ) throws NotFoundException {
228     tsam.moveSnapshotToStore(version, mpId, targetStorage);
229     return OK;
230   }
231 
232   /**
233    * Spawns a job to move a all snapshots of a mediapackage from their current storage to a new target storage location
234    *
235    * @param mpId
236    *  The mediapackage ID of the snapshot to move
237    * @param targetStorage
238    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
239    * @return
240    *  The {@link Job}
241    */
242   public Job moveById(final String mpId, final String targetStorage) {
243     RequireUtil.notEmpty(mpId, "mpId");
244     RequireUtil.notEmpty(targetStorage, "targetStorage");
245     List<String> args = new LinkedList<>();
246     args.add(targetStorage);
247     args.add(mpId);
248 
249     try {
250       return serviceRegistry.createJob(JOB_TYPE, Operation.MoveById.toString(), args, null, true, NONTERMINAL_JOB_LOAD);
251     } catch (ServiceRegistryException e) {
252       throw new AssetManagerException("Unable to create a job", e);
253     }
254   }
255 
256   /**
257    * Moves all the appropriate snapshots to their new home
258    *
259    * @param mpId
260    *  The mediapackage ID of the snapshot to move
261    * @param targetStorage
262    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
263    * @return
264    *  The String containing the number of successful and failed moves
265    *  [0 OK ][0 FAILED ]
266    */
267   protected String internalMoveById(final String mpId, final String targetStorage) {
268     RichAResult results = tsam.getSnapshotsByIdOrderedByVersion(mpId, true);
269     MoveRecordInfo result = moveSnapshots(results, targetStorage);
270     return result.toString();
271   }
272 
273 
274   /**
275    * Spawns a job to move a all snapshots taken between two points from their
276    * current storage to a new target storage location
277    *
278    * @param start
279    *  The start {@link Date}
280    * @param end
281    *  The end {@link Date}
282    * @param targetStorage
283    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
284    * @return
285    *  The {@link Job}
286    */
287   public Job moveByDate(final Date start, final Date end, final String targetStorage) {
288     RequireUtil.notNull(start, "start");
289     RequireUtil.notNull(end, "end");
290     RequireUtil.notNull(targetStorage, "targetStorage");
291     List<String> args = new LinkedList<>();
292     args.add(targetStorage);
293     args.add(Long.toString(start.getTime()));
294     args.add(Long.toString(end.getTime()));
295 
296     try {
297       return serviceRegistry.createJob(
298           JOB_TYPE, Operation.MoveByDate.toString(), args, null, true, NONTERMINAL_JOB_LOAD);
299     } catch (ServiceRegistryException e) {
300       throw new AssetManagerException("Unable to create a job", e);
301     }
302   }
303 
304   /**
305    * Spawns subjobs on a per-snapshot level to move the appropriate snapshots to their new home
306    * Moves all the appropriate snapshots to their new home
307    *
308    * @param start
309    *  The start {@link Date}
310    * @param end
311    *  The end {@link Date}
312    * @param targetStorage
313    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
314    * @return
315    *  The number of subjobs spawned
316    */
317   protected String internalMoveByDate(final Date start, final Date end, final String targetStorage) {
318     RichAResult results = tsam.getSnapshotsByDateOrderedById(start, end);
319     List<Job> subjobs = spawnSubjobs(results, start, end, targetStorage);
320     return Integer.toString(subjobs.size());
321   }
322 
323   /**
324    * Spawns a job to move a all snapshots of a given mediapackage taken between
325    * two points from their current storage to a new target storage location
326    *
327    * @param mpId
328    *  The mediapackage ID of the snapshot to move
329    * @param start
330    *  The start {@link Date}
331    * @param end
332    *  The end {@link Date}
333    * @param targetStorage
334    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
335    * @return
336    *  The {@link Job}
337    */
338   public Job moveByIdAndDate(final String mpId, final Date start, final Date end, final String targetStorage) {
339     RequireUtil.notNull(mpId, "mpId");
340     RequireUtil.notNull(start, "start");
341     RequireUtil.notNull(end, "end");
342     RequireUtil.notNull(targetStorage, "targetStorage");
343     List<String> args = new LinkedList<>();
344     args.add(targetStorage);
345     args.add(mpId);
346     args.add(Long.toString(start.getTime()));
347     args.add(Long.toString(end.getTime()));
348 
349     try {
350       return serviceRegistry.createJob(
351           JOB_TYPE, Operation.MoveByIdAndDate.toString(), args, null, true, NONTERMINAL_JOB_LOAD);
352     } catch (ServiceRegistryException e) {
353       throw new AssetManagerException("Unable to create a job", e);
354     }
355   }
356 
357   /**
358    * Moves all the appropriate snapshots to their new home
359    *
360    * @param mpId
361    *  The mediapackage ID of the snapshot to move
362    * @param start
363    *  The start {@link Date}
364    * @param end
365    *  The end {@link Date}
366    * @param targetStorage
367    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
368    * @return
369    *  The JSON String containing the number of successful and failed moves
370    *  {"OK": 0,"FAIL": 0}
371    */
372   protected String internalMoveByIdAndDate(
373       final String mpId,
374       final Date start,
375       final Date end,
376       final String targetStorage
377   ) {
378     RichAResult results = tsam.getSnapshotsByIdAndDateOrderedByVersion(mpId, start, end, true);
379     MoveRecordInfo result = moveSnapshots(results, targetStorage);
380     return result.toString();
381   }
382 
383   /**
384    * Spawns the subjobs based on the stream of records
385    *
386    * @param records
387    *  The stream of records containing the snapshots to move to the new target storage
388    * @param targetStorage
389    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
390    * @return
391    *  The set of subjobs
392    */
393 
394   private List<Job> spawnSubjobs(
395       final RichAResult records,
396       final Date start,
397       final Date end,
398       final String targetStorage
399   ) {
400     List<Job> jobs = new LinkedList<>();
401     MoveRecordInfo recordInfo = new MoveRecordInfo();
402     records.forEach(new Consumer<ARecord>() {
403       @Override
404       public void accept(ARecord record) {
405         Snapshot snap = record.getSnapshot().get();
406         String mediaPackageId = snap.getMediaPackage().getIdentifier().toString();
407         if (recordInfo.isNewMpId(mediaPackageId)) {
408           jobs.add(moveByIdAndDate(mediaPackageId,start,end,targetStorage));
409         }
410       }
411     });
412     return jobs;
413   }
414 
415   /**
416    * Moves all snapshot based on the stream of records from its current storage to a new target storage location
417    *
418    * @param records
419    *  The stream of records containing the snapshots to move to the new target storage
420    * @param targetStorage
421    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
422    * @return
423    *  The {@link MoveRecordInfo}
424    */
425   private MoveRecordInfo moveSnapshots(final RichAResult records, final String targetStorage) {
426     final MoveRecordInfo result = new MoveRecordInfo();
427     records.forEach(new Consumer<ARecord>() {
428       @Override
429       public void accept(ARecord record) {
430         Snapshot snap = record.getSnapshot().get();
431           try {
432             logger.debug("moving Mediapackage {} Version {} from {} to {}",
433                 snap.getMediaPackage().getIdentifier().toString(),
434                 snap.getVersion().toString(),
435                 snap.getStorageId(),
436                 targetStorage
437             );
438             internalMoveByIdAndVersion(snap.getVersion(),
439                 snap.getMediaPackage().getIdentifier().toString(),
440                 targetStorage
441             );
442             result.addSuccess();
443           } catch (NotFoundException e) {
444             result.addFailed();
445             logger.warn(e.getMessage());
446           }
447       }
448     });
449     return result;
450   }
451 
452   @Reference
453   protected void setServiceRegistry(ServiceRegistry serviceRegistry) {
454     this.serviceRegistry = serviceRegistry;
455   }
456 
457   @Override
458   protected ServiceRegistry getServiceRegistry() {
459     return this.serviceRegistry;
460   }
461 
462   @Reference
463   protected void setAssetManager(AssetManager assetManager) {
464     this.tsam = assetManager;
465   }
466 
467   @Reference
468   protected void setSecurityService(SecurityService securityService) {
469     this.securityService = securityService;
470   }
471 
472   @Override
473   protected SecurityService getSecurityService() {
474     return this.securityService;
475   }
476 
477   @Reference
478   protected void setUserDirectoryService(UserDirectoryService uds) {
479     this.userDirectoryService = uds;
480   }
481 
482   @Override
483   protected UserDirectoryService getUserDirectoryService() {
484     return this.userDirectoryService;
485   }
486 
487   @Reference
488   protected void setOrganizationDirectoryService(OrganizationDirectoryService os) {
489     this.organizationDirectoryService = os;
490   }
491 
492   @Override
493   protected OrganizationDirectoryService getOrganizationDirectoryService() {
494     return this.organizationDirectoryService;
495   }
496 }