View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.assetmanager.impl;
22  
23  import org.opencastproject.assetmanager.api.AssetManager;
24  import org.opencastproject.assetmanager.api.AssetManagerException;
25  import org.opencastproject.assetmanager.api.Snapshot;
26  import org.opencastproject.assetmanager.api.Version;
27  import org.opencastproject.assetmanager.api.storage.AssetStore;
28  import org.opencastproject.assetmanager.api.storage.RemoteAssetStore;
29  import org.opencastproject.job.api.AbstractJobProducer;
30  import org.opencastproject.job.api.Job;
31  import org.opencastproject.security.api.OrganizationDirectoryService;
32  import org.opencastproject.security.api.SecurityService;
33  import org.opencastproject.security.api.UserDirectoryService;
34  import org.opencastproject.serviceregistry.api.ServiceRegistry;
35  import org.opencastproject.serviceregistry.api.ServiceRegistryException;
36  import org.opencastproject.util.NotFoundException;
37  import org.opencastproject.util.RequireUtil;
38  
39  import com.google.gson.Gson;
40  
41  import org.osgi.service.component.ComponentContext;
42  import org.osgi.service.component.annotations.Activate;
43  import org.osgi.service.component.annotations.Component;
44  import org.osgi.service.component.annotations.Reference;
45  import org.slf4j.Logger;
46  import org.slf4j.LoggerFactory;
47  
48  import java.util.Date;
49  import java.util.HashMap;
50  import java.util.LinkedList;
51  import java.util.List;
52  import java.util.Map;
53  import java.util.Optional;
54  
55  @Component(
56      immediate = true,
57      service = AssetManagerJobProducer.class,
58      property = {
59          "service.description=Opencast Asset Manager Job Producer"
60      }
61  )
62  public class AssetManagerJobProducer extends AbstractJobProducer {
63  
64    /** The logging facility */
65    private static final Logger logger = LoggerFactory.getLogger(AssetManagerJobProducer.class);
66  
67    public static final String JOB_TYPE = "org.opencastproject.assetmanager";
68    public static final Float JOB_LOAD = 0.1f;
69    public static final Float NONTERMINAL_JOB_LOAD = 0.1f;
70  
71    public enum Operation {
72      MoveById, MoveByIdAndVersion, MoveByIdAndDate, MoveByDate, MoveRecords
73    }
74  
75    private static final String OK = "OK";
76  
77    private AssetManager tsam = null;
78    private ServiceRegistry serviceRegistry = null;
79    private SecurityService securityService = null;
80    private UserDirectoryService userDirectoryService = null;
81    private OrganizationDirectoryService organizationDirectoryService = null;
82  
83    public AssetManagerJobProducer() {
84      super(JOB_TYPE);
85    }
86  
87    /**
88     * OSGi callback on component activation.
89     *
90     * @param cc
91     *          the component context
92     */
93    @Override
94    @Activate
95    public void activate(ComponentContext cc) {
96      logger.info("Activating assetmanager job service");
97      super.activate(cc);
98    }
99  
100   public boolean datastoreExists(String storeId) {
101     Optional<AssetStore> store = tsam.getAssetStore(storeId);
102     return store.isPresent();
103   }
104 
105   /** Utility class to collect RecordInformation for moving larger 
106    * groups of mediapackages in combined jobs.
107    */
108   private class MoveRecordInfo {
109     private final Gson gson = new Gson();
110     private int success = 0;
111     private int failed = 0;
112     private String currentMpId = "";
113     public void addSuccess() {
114       success++;
115     };
116     public void addFailed() {
117       failed ++;
118     }
119 
120     public boolean isNewMpId(String mpId) {
121       if (currentMpId.equals(mpId)) {
122         return false;
123       }
124       currentMpId = mpId;
125       return true;
126     }
127 
128     @Override
129     public String toString() {
130       Map<String,Integer> result = new HashMap<>();
131       if (success > 0) {
132         result.put("OK", success);
133       }
134       if (failed > 0) {
135         result.put("FAIL", failed);
136       }
137       return gson.toJson(result);
138     }
139   };
140 
141   @Override
142   protected String process(Job job) throws ServiceRegistryException {
143     Operation op = null;
144     String operation = job.getOperation();
145     List<String> arguments = job.getArguments();
146     String id;
147     String targetStore = arguments.get(0);
148     VersionImpl version;
149     Date start;
150     Date end;
151     try {
152       op = Operation.valueOf(operation);
153       switch (op) {
154         case MoveById:
155           id = arguments.get(1);
156           return internalMoveById(id, targetStore);
157         case MoveByIdAndVersion:
158           id = arguments.get(1);
159           version = VersionImpl.mk(Long.parseLong(arguments.get(2)));
160           return internalMoveByIdAndVersion(version, id, targetStore);
161         case MoveByDate:
162           start = new Date(Long.parseLong(arguments.get(1)));
163           end = new Date(Long.parseLong(arguments.get(2)));
164           return internalMoveByDate(start, end, targetStore);
165         case MoveByIdAndDate:
166           id = arguments.get(1);
167           start = new Date(Long.parseLong(arguments.get(2)));
168           end = new Date(Long.parseLong(arguments.get(3)));
169           return internalMoveByIdAndDate(id, start, end, targetStore);
170         default:
171           throw new IllegalArgumentException("Unknown operation '" + operation + "'");
172       }
173     } catch (NotFoundException e) {
174       throw new ServiceRegistryException("Error running job", e);
175     } catch (Exception e) {
176       throw new ServiceRegistryException("Error handling operation '" + op + "'", e);
177     }
178   }
179 
180   /**
181    * Spawns a job to move a single snapshot from its current storage to a new target storage location
182    *
183    * @param version
184    *  The {@link Version} to move
185    * @param mpId
186    *  The mediapackage ID of the snapshot to move
187    * @param targetStorage
188    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
189    * @return
190    */
191   public Job moveByIdAndVersion(final Version version, final String mpId, final String targetStorage) {
192     RequireUtil.notNull(version, "version");
193     RequireUtil.notEmpty(mpId, "mpId");
194     RequireUtil.notEmpty(targetStorage, "targetStorage");
195     List<String> args = new LinkedList<>();
196     args.add(targetStorage);
197     args.add(mpId);
198     args.add(version.toString());
199 
200     try {
201       return serviceRegistry.createJob(JOB_TYPE, Operation.MoveByIdAndVersion.toString(), args, null, true, JOB_LOAD);
202     } catch (ServiceRegistryException e) {
203       throw new AssetManagerException("Unable to create a job", e);
204     }
205   }
206 
207   /**
208    * Triggers the move operation inside the {@link AssetManager}
209    *
210    * @param version
211    *  The {@link Version} to move
212    * @param mpId
213    *  The mediapackage ID of the snapshot to move
214    * @param targetStorage
215    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
216    * @return
217    *  The string "OK"
218    * @throws NotFoundException
219    */
220   protected String internalMoveByIdAndVersion(
221       final Version version,
222       final String mpId,
223       final String targetStorage
224   ) throws NotFoundException {
225     tsam.moveSnapshotToStore(version, mpId, targetStorage);
226     return OK;
227   }
228 
229   /**
230    * Spawns a job to move a all snapshots of a mediapackage from their current storage to a new target storage location
231    *
232    * @param mpId
233    *  The mediapackage ID of the snapshot to move
234    * @param targetStorage
235    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
236    * @return
237    *  The {@link Job}
238    */
239   public Job moveById(final String mpId, final String targetStorage) {
240     RequireUtil.notEmpty(mpId, "mpId");
241     RequireUtil.notEmpty(targetStorage, "targetStorage");
242     List<String> args = new LinkedList<>();
243     args.add(targetStorage);
244     args.add(mpId);
245 
246     try {
247       return serviceRegistry.createJob(JOB_TYPE, Operation.MoveById.toString(), args, null, true, NONTERMINAL_JOB_LOAD);
248     } catch (ServiceRegistryException e) {
249       throw new AssetManagerException("Unable to create a job", e);
250     }
251   }
252 
253   /**
254    * Moves all the appropriate snapshots to their new home
255    *
256    * @param mpId
257    *  The mediapackage ID of the snapshot to move
258    * @param targetStorage
259    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
260    * @return
261    *  The String containing the number of successful and failed moves
262    *  [0 OK ][0 FAILED ]
263    */
264   protected String internalMoveById(final String mpId, final String targetStorage) {
265     List<Snapshot> results = tsam.getSnapshotsByIdOrderedByVersion(mpId, true);
266     MoveRecordInfo result = moveSnapshots(results, targetStorage);
267     return result.toString();
268   }
269 
270 
271   /**
272    * Spawns a job to move a all snapshots taken between two points from their
273    * current storage to a new target storage location
274    *
275    * @param start
276    *  The start {@link Date}
277    * @param end
278    *  The end {@link Date}
279    * @param targetStorage
280    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
281    * @return
282    *  The {@link Job}
283    */
284   public Job moveByDate(final Date start, final Date end, final String targetStorage) {
285     RequireUtil.notNull(start, "start");
286     RequireUtil.notNull(end, "end");
287     RequireUtil.notNull(targetStorage, "targetStorage");
288     List<String> args = new LinkedList<>();
289     args.add(targetStorage);
290     args.add(Long.toString(start.getTime()));
291     args.add(Long.toString(end.getTime()));
292 
293     try {
294       return serviceRegistry.createJob(
295           JOB_TYPE, Operation.MoveByDate.toString(), args, null, true, NONTERMINAL_JOB_LOAD);
296     } catch (ServiceRegistryException e) {
297       throw new AssetManagerException("Unable to create a job", e);
298     }
299   }
300 
301   /**
302    * Spawns subjobs on a per-snapshot level to move the appropriate snapshots to their new home
303    * Moves all the appropriate snapshots to their new home
304    *
305    * @param start
306    *  The start {@link Date}
307    * @param end
308    *  The end {@link Date}
309    * @param targetStorage
310    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
311    * @return
312    *  The number of subjobs spawned
313    */
314   protected String internalMoveByDate(final Date start, final Date end, final String targetStorage) {
315     List<Snapshot> snapshots = tsam.getSnapshotsByDateOrderedById(start, end);
316     List<Job> subjobs = spawnSubjobs(snapshots, start, end, targetStorage);
317     return Integer.toString(subjobs.size());
318   }
319 
320   /**
321    * Spawns a job to move a all snapshots of a given mediapackage taken between
322    * two points from their current storage to a new target storage location
323    *
324    * @param mpId
325    *  The mediapackage ID of the snapshot to move
326    * @param start
327    *  The start {@link Date}
328    * @param end
329    *  The end {@link Date}
330    * @param targetStorage
331    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
332    * @return
333    *  The {@link Job}
334    */
335   public Job moveByIdAndDate(final String mpId, final Date start, final Date end, final String targetStorage) {
336     RequireUtil.notNull(mpId, "mpId");
337     RequireUtil.notNull(start, "start");
338     RequireUtil.notNull(end, "end");
339     RequireUtil.notNull(targetStorage, "targetStorage");
340     List<String> args = new LinkedList<>();
341     args.add(targetStorage);
342     args.add(mpId);
343     args.add(Long.toString(start.getTime()));
344     args.add(Long.toString(end.getTime()));
345 
346     try {
347       return serviceRegistry.createJob(
348           JOB_TYPE, Operation.MoveByIdAndDate.toString(), args, null, true, NONTERMINAL_JOB_LOAD);
349     } catch (ServiceRegistryException e) {
350       throw new AssetManagerException("Unable to create a job", e);
351     }
352   }
353 
354   /**
355    * Moves all the appropriate snapshots to their new home
356    *
357    * @param mpId
358    *  The mediapackage ID of the snapshot to move
359    * @param start
360    *  The start {@link Date}
361    * @param end
362    *  The end {@link Date}
363    * @param targetStorage
364    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
365    * @return
366    *  The JSON String containing the number of successful and failed moves
367    *  {"OK": 0,"FAIL": 0}
368    */
369   protected String internalMoveByIdAndDate(
370       final String mpId,
371       final Date start,
372       final Date end,
373       final String targetStorage
374   ) {
375     List<Snapshot> snapshots = tsam.getSnapshotsByIdAndDateOrderedByVersion(mpId, start, end, true);
376     MoveRecordInfo result = moveSnapshots(snapshots, targetStorage);
377     return result.toString();
378   }
379 
380   /**
381    * Spawns the subjobs based on the stream of records
382    *
383    * @param records
384    *  The stream of records containing the snapshots to move to the new target storage
385    * @param targetStorage
386    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
387    * @return
388    *  The set of subjobs
389    */
390 
391   private List<Job> spawnSubjobs(
392       final List<Snapshot> snapshots,
393       final Date start,
394       final Date end,
395       final String targetStorage
396   ) {
397     List<Job> jobs = new LinkedList<>();
398     MoveRecordInfo recordInfo = new MoveRecordInfo();
399     snapshots.forEach(snap -> {
400       String mediaPackageId = snap.getMediaPackage().getIdentifier().toString();
401       if (recordInfo.isNewMpId(mediaPackageId)) {
402         jobs.add(moveByIdAndDate(mediaPackageId,start,end,targetStorage));
403       }
404     });
405     return jobs;
406   }
407 
408   /**
409    * Moves all snapshot based on the stream of records from its current storage to a new target storage location
410    *
411    * @param records
412    *  The stream of records containing the snapshots to move to the new target storage
413    * @param targetStorage
414    *  The {@link RemoteAssetStore} ID where the snapshot should be moved
415    * @return
416    *  The {@link MoveRecordInfo}
417    */
418   private MoveRecordInfo moveSnapshots(final List<Snapshot> snapshots, final String targetStorage) {
419     final MoveRecordInfo result = new MoveRecordInfo();
420     snapshots.forEach(snap -> {
421       try {
422         logger.debug("moving Mediapackage {} Version {} from {} to {}",
423             snap.getMediaPackage().getIdentifier().toString(),
424             snap.getVersion().toString(),
425             snap.getStorageId(),
426             targetStorage
427         );
428         internalMoveByIdAndVersion(snap.getVersion(),
429             snap.getMediaPackage().getIdentifier().toString(),
430             targetStorage
431         );
432         result.addSuccess();
433       } catch (NotFoundException e) {
434         result.addFailed();
435         logger.warn(e.getMessage());
436       }
437     });
438     return result;
439   }
440 
441   @Reference
442   protected void setServiceRegistry(ServiceRegistry serviceRegistry) {
443     this.serviceRegistry = serviceRegistry;
444   }
445 
446   @Override
447   protected ServiceRegistry getServiceRegistry() {
448     return this.serviceRegistry;
449   }
450 
451   @Reference
452   protected void setAssetManager(AssetManager assetManager) {
453     this.tsam = assetManager;
454   }
455 
456   @Reference
457   protected void setSecurityService(SecurityService securityService) {
458     this.securityService = securityService;
459   }
460 
461   @Override
462   protected SecurityService getSecurityService() {
463     return this.securityService;
464   }
465 
466   @Reference
467   protected void setUserDirectoryService(UserDirectoryService uds) {
468     this.userDirectoryService = uds;
469   }
470 
471   @Override
472   protected UserDirectoryService getUserDirectoryService() {
473     return this.userDirectoryService;
474   }
475 
476   @Reference
477   protected void setOrganizationDirectoryService(OrganizationDirectoryService os) {
478     this.organizationDirectoryService = os;
479   }
480 
481   @Override
482   protected OrganizationDirectoryService getOrganizationDirectoryService() {
483     return this.organizationDirectoryService;
484   }
485 }