1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.opencastproject.ingest.scanner;
24
25 import static org.opencastproject.security.util.SecurityUtil.getUserAndOrganization;
26 import static org.opencastproject.util.data.Collections.dict;
27 import static org.opencastproject.util.data.Tuple.tuple;
28
29 import org.opencastproject.ingest.api.IngestService;
30 import org.opencastproject.scheduler.api.SchedulerService;
31 import org.opencastproject.security.api.OrganizationDirectoryService;
32 import org.opencastproject.security.api.SecurityService;
33 import org.opencastproject.security.api.UserDirectoryService;
34 import org.opencastproject.security.util.SecurityContext;
35 import org.opencastproject.series.api.SeriesService;
36 import org.opencastproject.workspace.api.Workspace;
37
38 import org.apache.commons.io.FileUtils;
39 import org.apache.commons.lang3.BooleanUtils;
40 import org.apache.commons.lang3.StringUtils;
41 import org.apache.commons.lang3.math.NumberUtils;
42 import org.apache.felix.fileinstall.ArtifactInstaller;
43 import org.osgi.framework.BundleContext;
44 import org.osgi.framework.ServiceReference;
45 import org.osgi.service.cm.Configuration;
46 import org.osgi.service.cm.ConfigurationAdmin;
47 import org.osgi.service.cm.ConfigurationException;
48 import org.osgi.service.cm.ManagedService;
49 import org.osgi.service.component.ComponentContext;
50 import org.osgi.service.component.annotations.Activate;
51 import org.osgi.service.component.annotations.Component;
52 import org.osgi.service.component.annotations.Deactivate;
53 import org.osgi.service.component.annotations.Reference;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 import java.io.File;
58 import java.io.IOException;
59 import java.time.format.DateTimeFormatter;
60 import java.time.format.DateTimeFormatterBuilder;
61 import java.time.temporal.ChronoField;
62 import java.util.Dictionary;
63 import java.util.Enumeration;
64 import java.util.HashMap;
65 import java.util.Map;
66 import java.util.Objects;
67 import java.util.Optional;
68 import java.util.regex.Pattern;
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 @Component(
84 immediate = true,
85 service = {
86 ArtifactInstaller.class,
87 ManagedService.class
88 },
89 property = {
90 "service.pid=org.opencastproject.ingest.scanner.InboxScannerService",
91 "service.description=Inbox Scanner"
92 }
93 )
94 public class InboxScannerService implements ArtifactInstaller, ManagedService {
95
96
97 private static final Logger logger = LoggerFactory.getLogger(InboxScannerService.class);
98
99
100 public static final String USER_NAME = "user.name";
101
102
103 public static final String USER_ORG = "user.organization";
104
105
106 public static final String WORKFLOW_DEFINITION = "workflow.definition";
107
108
109 public static final String MEDIA_FLAVOR = "media.flavor";
110
111
112
113 public static final String WORKFLOW_CONFIG = "workflow.config";
114
115
116 public static final String INBOX_PATH = "inbox.path";
117
118
119 public static final String INBOX_POLL = "inbox.poll";
120
121 public static final String INBOX_THREADS = "inbox.threads";
122 public static final String INBOX_TRIES = "inbox.tries";
123 public static final String INBOX_TRIES_BETWEEN_SEC = "inbox.tries.between.sec";
124
125 public static final String INBOX_METADATA_REGEX = "inbox.metadata.regex";
126 public static final String INBOX_DATETIME_FORMAT = "inbox.datetime.format";
127 public static final String INBOX_METADATA_FFPROBE = "inbox.metadata.ffprobe";
128 public static final String INBOX_SCHEDULE_MATCH = "inbox.schedule.match";
129 public static final String INBOX_SCHEDULE_MATCH_THRESHOLD = "inbox.schedule.match.threshold";
130
131 public static final String FFPROBE_BINARY_CONFIG = "org.opencastproject.inspection.ffprobe.path";
132 public static final String FFPROBE_BINARY_DEFAULT = "ffprobe";
133
134 private IngestService ingestService;
135 private SecurityService securityService;
136 private UserDirectoryService userDir;
137 private OrganizationDirectoryService orgDir;
138 private SeriesService seriesService;
139 private SchedulerService schedulerService;
140 protected Workspace workspace;
141
142 private ComponentContext cc;
143
144 private volatile Ingestor ingestor = null;
145 private volatile Configuration fileInstallCfg = null;
146
147
148
149 @Activate
150 public synchronized void activate(ComponentContext cc) {
151 this.cc = cc;
152 }
153
154
155 @Deactivate
156 public void deactivate() {
157 removeFileInstallCfg();
158 }
159
160
161 @Override
162 public synchronized void updated(Dictionary properties) throws ConfigurationException {
163
164 if (properties == null) {
165 return;
166 }
167 final String orgId = getCfg(properties, USER_ORG);
168 final String userId = getCfg(properties, USER_NAME);
169 final String mediaFlavor = getCfg(properties, MEDIA_FLAVOR);
170 final String workflowDefinition = Objects.toString(properties.get(WORKFLOW_DEFINITION), null);
171 final Map<String, String> workflowConfig = getCfgAsMap(properties, WORKFLOW_CONFIG);
172 final int interval = NumberUtils.toInt(Objects.toString(properties.get(INBOX_POLL), "5000"));
173 final File inbox = new File(getCfg(properties, INBOX_PATH));
174 if (!inbox.isDirectory()) {
175 try {
176 FileUtils.forceMkdir(inbox);
177 } catch (IOException e) {
178 throw new ConfigurationException(INBOX_PATH,
179 String.format("%s does not exists and could not be created", inbox.getAbsolutePath()));
180 }
181 }
182
183 if (!inbox.canRead()) {
184 throw new ConfigurationException(INBOX_PATH, String.format("Cannot read from %s", inbox.getAbsolutePath()));
185 }
186
187 if (!inbox.canWrite()) {
188 throw new ConfigurationException(INBOX_PATH, String.format("Cannot write to %s", inbox.getAbsolutePath()));
189 }
190 final int maxThreads = NumberUtils.toInt(Objects.toString(properties.get(INBOX_THREADS), "1"));
191 final int maxTries = NumberUtils.toInt(Objects.toString(properties.get(INBOX_TRIES), "3"));
192 final int secondsBetweenTries = NumberUtils.toInt(Objects.toString(properties.get(INBOX_TRIES_BETWEEN_SEC), "300"));
193
194
195 var metadataPattern = Optional.ofNullable(properties.get(INBOX_METADATA_REGEX))
196 .map(Objects::toString)
197 .map(Pattern::compile);
198 var dateFormatter = Optional.ofNullable(properties.get(INBOX_DATETIME_FORMAT))
199 .map(Objects::toString)
200 .map(s -> new DateTimeFormatterBuilder().appendPattern(s)
201 .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
202 .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
203 .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
204 .toFormatter())
205 .orElse(DateTimeFormatter.ISO_DATE_TIME);
206 var ffprobe = BooleanUtils.toBoolean((String) properties.get(INBOX_METADATA_FFPROBE))
207 ? Objects.toString(cc.getBundleContext().getProperty(FFPROBE_BINARY_CONFIG), FFPROBE_BINARY_DEFAULT)
208 : null;
209 var matchSchedule = BooleanUtils.toBoolean((String) properties.get(INBOX_SCHEDULE_MATCH));
210 var matchThreshold = NumberUtils.toFloat((String) properties.get(INBOX_SCHEDULE_MATCH_THRESHOLD), -1F);
211
212 var securityContext = getUserAndOrganization(securityService, orgDir, orgId, userDir, userId)
213 .map(a -> new SecurityContext(securityService, a.getB(), a.getA()));
214
215 if (securityContext.isEmpty()) {
216 logger.warn("Could not create security context for user {}, organization {}. "
217 + "Either the organization or the user does not exist (yet).", userId, orgId);
218 }
219 for (int attempts = 0; attempts < 25 && securityContext.isEmpty(); attempts++) {
220 logger.info("Waiting for security context...");
221 try {
222 Thread.sleep(5000);
223 } catch (InterruptedException e) {
224 logger.warn("Interrupted while waiting for security context");
225 }
226 securityContext = getUserAndOrganization(securityService, orgDir, orgId, userDir, userId)
227 .map(a -> new SecurityContext(securityService, a.getB(), a.getA()));
228 }
229 if (securityContext.isEmpty()) {
230 logger.warn("Security context for user {} and organization {} is still empty. Giving up.", userId, orgId);
231 return;
232 }
233
234
235 removeFileInstallCfg();
236
237 fileInstallCfg = configureFileInstall(cc.getBundleContext(), inbox, interval);
238
239 this.ingestor = new Ingestor(ingestService, securityContext.get(), workflowDefinition,
240 workflowConfig, mediaFlavor, inbox, maxThreads, seriesService, maxTries, secondsBetweenTries,
241 metadataPattern, dateFormatter, schedulerService, ffprobe, matchSchedule, matchThreshold,
242 workspace);
243 new Thread(ingestor).start();
244 logger.info("Now watching inbox {}", inbox.getAbsolutePath());
245 }
246
247 private void removeFileInstallCfg() {
248 if (fileInstallCfg != null) {
249 try {
250 fileInstallCfg.delete();
251 } catch (IOException e) {
252 logger.error("Failed to delete file install configuration", e);
253 }
254 fileInstallCfg = null;
255 }
256 };
257
258
259
260
261
262
263
264 private static Configuration configureFileInstall(BundleContext bc, File inbox, int interval) {
265 final ServiceReference caRef = bc.getServiceReference(ConfigurationAdmin.class.getName());
266 if (caRef == null) {
267 throw new Error("Cannot obtain a reference to the ConfigurationAdmin service");
268 }
269 final Dictionary<String, String> fileInstallConfig = dict(tuple("felix.fileinstall.dir", inbox.getAbsolutePath()),
270 tuple("felix.fileinstall.poll", Integer.toString(interval)),
271 tuple("felix.fileinstall.subdir.mode", "recurse"));
272
273
274 try {
275 final String fileInstallBundleLocation = bc.getServiceReferences("org.osgi.service.cm.ManagedServiceFactory",
276 "(service.pid=org.apache.felix.fileinstall)")[0].getBundle().getLocation();
277 final Configuration conf = ((ConfigurationAdmin) bc.getService(caRef)).createFactoryConfiguration(
278 "org.apache.felix.fileinstall", fileInstallBundleLocation);
279 conf.update(fileInstallConfig);
280 return conf;
281 } catch (Exception e) {
282 throw new Error(e);
283 }
284 }
285
286
287
288
289
290
291 @Override
292 public boolean canHandle(final File artifact) {
293 return ingestor != null && ingestor.canHandle(artifact);
294 }
295
296 @Override
297 public void install(final File artifact) throws Exception {
298 if (ingestor != null) {
299 logger.trace("install(): {}", artifact.getName());
300 ingestor.ingest(artifact);
301 }
302 }
303
304 @Override
305 public void update(File artifact) {
306 logger.trace("update(): {}", artifact.getName());
307 }
308
309 @Override
310 public void uninstall(File artifact) {
311 if (ingestor != null) {
312 logger.trace("uninstall(): {}", artifact.getName());
313 ingestor.cleanup(artifact);
314 }
315 }
316
317
318
319
320 @Reference
321 public void setIngestService(IngestService ingestService) {
322 this.ingestService = ingestService;
323 }
324
325
326 @Reference
327 public void setSecurityService(SecurityService securityService) {
328 this.securityService = securityService;
329 }
330
331
332 @Reference
333 public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
334 this.userDir = userDirectoryService;
335 }
336
337
338 @Reference
339 public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
340 this.orgDir = organizationDirectoryService;
341 }
342
343
344
345
346
347
348
349 private static String getCfg(Dictionary d, String key) throws ConfigurationException {
350 Object p = d.get(key);
351 if (p == null)
352 throw new ConfigurationException(key, "does not exist");
353 String ps = p.toString();
354 if (StringUtils.isBlank(ps))
355 throw new ConfigurationException(key, "is blank");
356 return ps;
357 }
358
359 private static Map<String, String> getCfgAsMap(final Dictionary d, final String key) {
360
361 HashMap<String, String> config = new HashMap<>();
362 if (d == null) return config;
363 for (Enumeration e = d.keys(); e.hasMoreElements();) {
364 final String dKey = Objects.toString(e.nextElement());
365 if (dKey.startsWith(key)) {
366 config.put(dKey.substring(key.length() + 1), Objects.toString(d.get(dKey), null));
367 }
368 }
369 return config;
370 }
371
372 @Reference
373 public void setSeriesService(SeriesService seriesService) {
374 this.seriesService = seriesService;
375 }
376
377 @Reference
378 public void setSchedulerService(SchedulerService schedulerService) {
379 this.schedulerService = schedulerService;
380 }
381
382 @Reference
383 public void setWorkspace(Workspace workspace) {
384 this.workspace = workspace;
385 }
386 }