1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.assetmanager.impl;
23
24 import org.opencastproject.assetmanager.api.AssetManager;
25 import org.opencastproject.kernel.scanner.AbstractScanner;
26 import org.opencastproject.security.api.Organization;
27 import org.opencastproject.security.api.OrganizationDirectoryService;
28 import org.opencastproject.security.api.SecurityService;
29 import org.opencastproject.serviceregistry.api.ServiceRegistry;
30 import org.opencastproject.util.NeedleEye;
31 import org.opencastproject.util.NotFoundException;
32 import org.opencastproject.workflow.api.WorkflowService;
33
34 import org.apache.commons.lang3.BooleanUtils;
35 import org.apache.commons.lang3.StringUtils;
36 import org.osgi.service.cm.ConfigurationException;
37 import org.osgi.service.cm.ManagedService;
38 import org.osgi.service.component.ComponentContext;
39 import org.osgi.service.component.annotations.Activate;
40 import org.osgi.service.component.annotations.Component;
41 import org.osgi.service.component.annotations.Deactivate;
42 import org.osgi.service.component.annotations.Reference;
43 import org.quartz.CronExpression;
44 import org.quartz.JobDetail;
45 import org.quartz.JobExecutionContext;
46 import org.quartz.impl.StdSchedulerFactory;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 import java.util.Calendar;
51 import java.util.Date;
52 import java.util.Dictionary;
53 import java.util.Optional;
54 import java.util.concurrent.TimeUnit;
55
56 @Component(
57 immediate = true,
58 service = ManagedService.class,
59 property = {
60 "service.description=Timed media archiver Service"
61 }
62 )
63 public class TimedMediaArchiver extends AbstractScanner implements ManagedService {
64 private static final Logger logger = LoggerFactory.getLogger(TimedMediaArchiver.class);
65
66 public static final String PARAM_KEY_STORE_ID = "store-id";
67 public static final String PARAM_KEY_MAX_AGE = "max-age";
68 public static final String JOB_GROUP = "oc-asset-manager-timed-media-archiver-group";
69 public static final String JOB_NAME = "oc-asset-manager-timed-media-archive-job";
70 public static final String SCANNER_NAME = "Timed media archive offloader";
71 public static final String TRIGGER_GROUP = "oc-asset-manager-timed-media-archiver-trigger-group";
72 public static final String TRIGGER_NAME = "oc-asset-manager-timed-media-archiver-trigger";
73
74 private AssetManager assetManager;
75 private WorkflowService workflowService;
76 private String storeId;
77 private long ageModifier;
78
79 public TimedMediaArchiver() {
80 try {
81 quartz = new StdSchedulerFactory().getScheduler();
82 quartz.start();
83
84 final JobDetail job = new JobDetail(getJobName(), getJobGroup(), Runner.class);
85 job.setDurability(false);
86 job.setVolatility(true);
87 job.getJobDataMap().put(JOB_PARAM_PARENT, this);
88 quartz.addJob(job, true);
89 } catch (org.quartz.SchedulerException e) {
90 throw new RuntimeException(e);
91 }
92 }
93
94 @Activate
95 @Override
96 public void activate(ComponentContext cc) {
97 super.activate(cc);
98 }
99
100 @Deactivate
101 @Override
102 public void deactivate() {
103 super.deactivate();
104 }
105
106 public void updated(Dictionary<String, ?> properties) throws ConfigurationException {
107 String cronExpression;
108 boolean enabled;
109
110 unschedule();
111
112 if (properties != null) {
113 logger.debug("Updating configuration...");
114
115 enabled = BooleanUtils.toBoolean((String) properties.get(PARAM_KEY_ENABLED));
116 setEnabled(enabled);
117 logger.info("Timed media offload enabled: " + enabled);
118 if (!isEnabled()) {
119 return;
120 }
121
122 cronExpression = (String) properties.get(PARAM_KEY_CRON_EXPR);
123 if (StringUtils.isBlank(cronExpression) || !CronExpression.isValidExpression(cronExpression)) {
124 throw new ConfigurationException(PARAM_KEY_CRON_EXPR, "Cron expression must be valid");
125 }
126 setCronExpression(cronExpression);
127 logger.debug("Timed media offload cron expression: '" + cronExpression + "'");
128
129 storeId = (String) properties.get(PARAM_KEY_STORE_ID);
130 if (StringUtils.isBlank(storeId)) {
131 throw new ConfigurationException(PARAM_KEY_STORE_ID, "Store type is missing");
132 }
133 logger.debug("Remote media store type: " + storeId);
134
135 try {
136 ageModifier = Long.parseLong((String) properties.get(PARAM_KEY_MAX_AGE));
137 } catch (NumberFormatException e) {
138 throw new ConfigurationException(PARAM_KEY_MAX_AGE, "Invalid max age");
139 }
140 if (ageModifier < 0) {
141 throw new ConfigurationException(PARAM_KEY_MAX_AGE, "Max age must be greater than zero");
142 }
143 }
144
145 schedule();
146 }
147
148 @Override
149 public String getJobGroup() {
150 return JOB_GROUP;
151 }
152
153 @Override
154 public String getJobName() {
155 return JOB_NAME;
156 }
157
158 @Override
159 public String getTriggerGroupName() {
160 return TRIGGER_GROUP;
161 }
162
163 @Override
164 public String getTriggerName() {
165 return TRIGGER_NAME;
166 }
167
168 @Override
169 public void scan() {
170 Date maxAge = Calendar.getInstance().getTime();
171 maxAge.setTime(maxAge.getTime() - TimeUnit.HOURS.toMillis(ageModifier));
172 if (assetManager.getAssetStore(storeId).isEmpty()) {
173 throw new RuntimeException("Store " + storeId + " is not available to the asset manager");
174 }
175
176 try {
177
178 assetManager.moveSnapshotsByDate(new Date(0), maxAge, storeId);
179 } catch (NotFoundException ignore) {
180 logger.debug("No snapshots found that need to be moved");
181 }
182 }
183
184 @Override
185 public String getScannerName() {
186 return SCANNER_NAME;
187 }
188
189 @Reference
190 public void setAssetManager(AssetManager am) {
191 this.assetManager = am;
192 }
193
194
195 public static class Runner extends TypedQuartzJob<AbstractScanner> {
196 private static final NeedleEye eye = new NeedleEye();
197
198 public Runner() {
199 super(Optional.of(eye));
200 }
201
202 @Override
203 protected void execute(final AbstractScanner parameters, JobExecutionContext ctx) {
204 logger.debug("Starting " + parameters.getScannerName() + " job.");
205
206
207 for (final Organization org : parameters.getOrganizationDirectoryService().getOrganizations()) {
208
209 parameters.getAdminContextFor(org.getId()).runInContext(parameters::scan);
210 }
211
212 logger.debug("Finished " + parameters.getScannerName() + " job.");
213 }
214 }
215
216 @Reference
217 @Override
218 public void bindOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
219 super.bindOrganizationDirectoryService(organizationDirectoryService);
220 }
221
222 @Reference
223 @Override
224 public void bindSecurityService(SecurityService securityService) {
225 super.bindSecurityService(securityService);
226 }
227
228 @Reference
229 @Override
230 public void bindServiceRegistry(ServiceRegistry serviceRegistry) {
231 super.bindServiceRegistry(serviceRegistry);
232 }
233
234 }