View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  
23  package org.opencastproject.ingest.scanner;
24  
25  import static org.opencastproject.security.util.SecurityUtil.getUserAndOrganization;
26  import static org.opencastproject.util.data.Collections.dict;
27  import static org.opencastproject.util.data.Tuple.tuple;
28  
29  import org.opencastproject.ingest.api.IngestService;
30  import org.opencastproject.scheduler.api.SchedulerService;
31  import org.opencastproject.security.api.OrganizationDirectoryService;
32  import org.opencastproject.security.api.SecurityService;
33  import org.opencastproject.security.api.UserDirectoryService;
34  import org.opencastproject.security.util.SecurityContext;
35  import org.opencastproject.series.api.SeriesService;
36  import org.opencastproject.workspace.api.Workspace;
37  
38  import org.apache.commons.io.FileUtils;
39  import org.apache.commons.lang3.BooleanUtils;
40  import org.apache.commons.lang3.StringUtils;
41  import org.apache.commons.lang3.math.NumberUtils;
42  import org.apache.felix.fileinstall.ArtifactInstaller;
43  import org.osgi.framework.BundleContext;
44  import org.osgi.framework.ServiceReference;
45  import org.osgi.service.cm.Configuration;
46  import org.osgi.service.cm.ConfigurationAdmin;
47  import org.osgi.service.cm.ConfigurationException;
48  import org.osgi.service.cm.ManagedService;
49  import org.osgi.service.component.ComponentContext;
50  import org.osgi.service.component.annotations.Activate;
51  import org.osgi.service.component.annotations.Component;
52  import org.osgi.service.component.annotations.Deactivate;
53  import org.osgi.service.component.annotations.Reference;
54  import org.slf4j.Logger;
55  import org.slf4j.LoggerFactory;
56  
57  import java.io.File;
58  import java.io.IOException;
59  import java.time.format.DateTimeFormatter;
60  import java.time.format.DateTimeFormatterBuilder;
61  import java.time.temporal.ChronoField;
62  import java.util.Dictionary;
63  import java.util.Enumeration;
64  import java.util.HashMap;
65  import java.util.Map;
66  import java.util.Objects;
67  import java.util.Optional;
68  import java.util.regex.Pattern;
69  
70  /**
71   * The inbox scanner monitors a directory for incoming media packages.
72   * <p>
73   * There is one InboxScanner instance per inbox. Each instance is configured by a config file in
74   * <code>.../etc/load</code> named <code>&lt;inbox-scanned-pid&gt;-&lt;name&gt;.cfg</code> where <code>name</code>
75   * can be arbitrarily chosen and has no further meaning. <code>inbox-scanned-pid</code> must confirm to the PID given to
76   * the InboxScanner in the declarative service (DS) configuration <code>OSGI-INF/inbox-scanner-service.xml</code>.
77   *
78   * <h2>Implementation notes</h2>
79   * Monitoring leverages Apache FileInstall by implementing {@link ArtifactInstaller}.
80   *
81   * @see Ingestor
82   */
83  @Component(
84    immediate = true,
85    service = {
86      ArtifactInstaller.class,
87      ManagedService.class
88    },
89    property = {
90      "service.pid=org.opencastproject.ingest.scanner.InboxScannerService",
91      "service.description=Inbox Scanner"
92    }
93  )
94  public class InboxScannerService implements ArtifactInstaller, ManagedService {
95  
96    /** The logger */
97    private static final Logger logger = LoggerFactory.getLogger(InboxScannerService.class);
98  
99    /** The configuration key to use for determining the user to run as for ingest */
100   public static final String USER_NAME = "user.name";
101 
102   /** The configuration key to use for determining the user's organization */
103   public static final String USER_ORG = "user.organization";
104 
105   /** The configuration key to use for determining the workflow definition to use for ingest */
106   public static final String WORKFLOW_DEFINITION = "workflow.definition";
107 
108   /** The configuration key to use for determining the default media flavor */
109   public static final String MEDIA_FLAVOR = "media.flavor";
110 
111 
112   /** The configuration key to use for determining the workflow configuration to use for ingest */
113   public static final String WORKFLOW_CONFIG = "workflow.config";
114 
115   /** The configuration key to use for determining the inbox path */
116   public static final String INBOX_PATH = "inbox.path";
117 
118   /** The configuration key to use for determining the polling interval in ms. */
119   public static final String INBOX_POLL = "inbox.poll";
120 
121   public static final String INBOX_THREADS = "inbox.threads";
122   public static final String INBOX_TRIES = "inbox.tries";
123   public static final String INBOX_TRIES_BETWEEN_SEC = "inbox.tries.between.sec";
124 
125   public static final String INBOX_METADATA_REGEX = "inbox.metadata.regex";
126   public static final String INBOX_DATETIME_FORMAT = "inbox.datetime.format";
127   public static final String INBOX_METADATA_FFPROBE = "inbox.metadata.ffprobe";
128   public static final String INBOX_SCHEDULE_MATCH = "inbox.schedule.match";
129   public static final String INBOX_SCHEDULE_MATCH_THRESHOLD = "inbox.schedule.match.threshold";
130 
131   public static final String FFPROBE_BINARY_CONFIG = "org.opencastproject.inspection.ffprobe.path";
132   public static final String FFPROBE_BINARY_DEFAULT = "ffprobe";
133 
134   private IngestService ingestService;
135   private SecurityService securityService;
136   private UserDirectoryService userDir;
137   private OrganizationDirectoryService orgDir;
138   private SeriesService seriesService;
139   private SchedulerService schedulerService;
140   protected Workspace workspace;
141 
142   private ComponentContext cc;
143 
144   private volatile Ingestor ingestor = null;
145   private volatile Configuration fileInstallCfg = null;
146 
147   /** OSGi callback. */
148   // synchronized with updated(Dictionary)
149   @Activate
150   public synchronized void activate(ComponentContext cc) {
151     this.cc = cc;
152   }
153 
154   /** OSGi callback. */
155   @Deactivate
156   public void deactivate() {
157     removeFileInstallCfg();
158   }
159 
160   // synchronized with activate(ComponentContext)
161   @Override
162   public synchronized void updated(Dictionary properties) throws ConfigurationException {
163     // build scanner configuration
164     if (properties == null) {
165       return;
166     }
167     final String orgId = getCfg(properties, USER_ORG);
168     final String userId = getCfg(properties, USER_NAME);
169     final String mediaFlavor = getCfg(properties, MEDIA_FLAVOR);
170     final String workflowDefinition = Objects.toString(properties.get(WORKFLOW_DEFINITION), null);
171     final Map<String, String> workflowConfig = getCfgAsMap(properties, WORKFLOW_CONFIG);
172     final int interval = NumberUtils.toInt(Objects.toString(properties.get(INBOX_POLL), "5000"));
173     final File inbox = new File(getCfg(properties, INBOX_PATH));
174     if (!inbox.isDirectory()) {
175       try {
176         FileUtils.forceMkdir(inbox);
177       } catch (IOException e) {
178         throw new ConfigurationException(INBOX_PATH,
179             String.format("%s does not exists and could not be created", inbox.getAbsolutePath()));
180       }
181     }
182     /* We need to be able to read from the inbox to get files from there */
183     if (!inbox.canRead()) {
184       throw new ConfigurationException(INBOX_PATH, String.format("Cannot read from %s", inbox.getAbsolutePath()));
185     }
186     /* We need to be able to write to the inbox to remove files after they have been ingested */
187     if (!inbox.canWrite()) {
188       throw new ConfigurationException(INBOX_PATH, String.format("Cannot write to %s", inbox.getAbsolutePath()));
189     }
190     final int maxThreads = NumberUtils.toInt(Objects.toString(properties.get(INBOX_THREADS), "1"));
191     final int maxTries = NumberUtils.toInt(Objects.toString(properties.get(INBOX_TRIES), "3"));
192     final int secondsBetweenTries = NumberUtils.toInt(Objects.toString(properties.get(INBOX_TRIES_BETWEEN_SEC), "300"));
193 
194     // Metadata parsing configuration
195     var metadataPattern = Optional.ofNullable(properties.get(INBOX_METADATA_REGEX))
196             .map(Objects::toString)
197             .map(Pattern::compile);
198     var dateFormatter = Optional.ofNullable(properties.get(INBOX_DATETIME_FORMAT))
199             .map(Objects::toString)
200             .map(s -> new DateTimeFormatterBuilder().appendPattern(s)
201                     .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
202                     .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
203                     .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
204                     .toFormatter())
205             .orElse(DateTimeFormatter.ISO_DATE_TIME);
206     var ffprobe = BooleanUtils.toBoolean((String) properties.get(INBOX_METADATA_FFPROBE))
207             ? Objects.toString(cc.getBundleContext().getProperty(FFPROBE_BINARY_CONFIG), FFPROBE_BINARY_DEFAULT)
208             : null;
209     var matchSchedule = BooleanUtils.toBoolean((String) properties.get(INBOX_SCHEDULE_MATCH));
210     var matchThreshold = NumberUtils.toFloat((String) properties.get(INBOX_SCHEDULE_MATCH_THRESHOLD), -1F);
211 
212     var securityContext = getUserAndOrganization(securityService, orgDir, orgId, userDir, userId)
213             .map(a -> new SecurityContext(securityService, a.getB(), a.getA()));
214 
215     if (securityContext.isEmpty()) {
216       logger.warn("Could not create security context for user {}, organization {}. "
217           + "Either the organization or the user does not exist (yet).", userId, orgId);
218     }
219     for (int attempts = 0; attempts < 25 && securityContext.isEmpty(); attempts++) {
220       logger.info("Waiting for security context...");
221       try {
222         Thread.sleep(5000);
223       } catch (InterruptedException e) {
224         logger.warn("Interrupted while waiting for security context");
225       }
226       securityContext = getUserAndOrganization(securityService, orgDir, orgId, userDir, userId)
227           .map(a -> new SecurityContext(securityService, a.getB(), a.getA()));
228     }
229     if (securityContext.isEmpty()) {
230       logger.warn("Security context for user {} and organization {} is still empty. Giving up.", userId, orgId);
231       return;
232     }
233 
234     // remove old file install configuration
235     removeFileInstallCfg();
236     // set up new file install config
237     fileInstallCfg = configureFileInstall(cc.getBundleContext(), inbox, interval);
238     // create new scanner
239     this.ingestor = new Ingestor(ingestService, securityContext.get(), workflowDefinition,
240             workflowConfig, mediaFlavor, inbox, maxThreads, seriesService, maxTries, secondsBetweenTries,
241             metadataPattern, dateFormatter, schedulerService, ffprobe, matchSchedule, matchThreshold,
242             workspace);
243     new Thread(ingestor).start();
244     logger.info("Now watching inbox {}", inbox.getAbsolutePath());
245   }
246 
247   private void removeFileInstallCfg() {
248     if (fileInstallCfg != null) {
249       try {
250         fileInstallCfg.delete();
251       } catch (IOException e) {
252         logger.error("Failed to delete file install configuration", e);
253       }
254       fileInstallCfg = null;
255     }
256   };
257 
258   /**
259    * Setup an Apache FileInstall configuration for the inbox folder this scanner is responsible for.
260    *
261    * see section 104.4.1 Location Binding, paragraph 4, of the OSGi Spec 4.2 The correct permissions are needed in order
262    * to set configuration data for a bundle other than the calling bundle itself.
263    */
264   private static Configuration configureFileInstall(BundleContext bc, File inbox, int interval) {
265     final ServiceReference caRef = bc.getServiceReference(ConfigurationAdmin.class.getName());
266     if (caRef == null) {
267       throw new Error("Cannot obtain a reference to the ConfigurationAdmin service");
268     }
269     final Dictionary<String, String> fileInstallConfig = dict(tuple("felix.fileinstall.dir", inbox.getAbsolutePath()),
270             tuple("felix.fileinstall.poll", Integer.toString(interval)),
271             tuple("felix.fileinstall.subdir.mode", "recurse"));
272 
273     // update file install config with the new directory
274     try {
275       final String fileInstallBundleLocation = bc.getServiceReferences("org.osgi.service.cm.ManagedServiceFactory",
276               "(service.pid=org.apache.felix.fileinstall)")[0].getBundle().getLocation();
277       final Configuration conf = ((ConfigurationAdmin) bc.getService(caRef)).createFactoryConfiguration(
278               "org.apache.felix.fileinstall", fileInstallBundleLocation);
279       conf.update(fileInstallConfig);
280       return conf;
281     } catch (Exception e) {
282       throw new Error(e);
283     }
284   }
285 
286   // --
287 
288   // FileInstall callback, called on a different thread
289   // Attention: This method may be called _before_ the updated(Dictionary) which means that config parameters
290   // are not set yet.
291   @Override
292   public boolean canHandle(final File artifact) {
293     return ingestor != null && ingestor.canHandle(artifact);
294   }
295 
296   @Override
297   public void install(final File artifact) throws Exception {
298     if (ingestor != null) {
299       logger.trace("install(): {}", artifact.getName());
300       ingestor.ingest(artifact);
301     }
302   }
303 
304   @Override
305   public void update(File artifact) {
306     logger.trace("update(): {}", artifact.getName());
307   }
308 
309   @Override
310   public void uninstall(File artifact) {
311     if (ingestor != null) {
312       logger.trace("uninstall(): {}", artifact.getName());
313       ingestor.cleanup(artifact);
314     }
315   }
316 
317   // --
318 
319   /** OSGi callback to set the ingest service. */
320   @Reference
321   public void setIngestService(IngestService ingestService) {
322     this.ingestService = ingestService;
323   }
324 
325   /** OSGi callback to set the security service. */
326   @Reference
327   public void setSecurityService(SecurityService securityService) {
328     this.securityService = securityService;
329   }
330 
331   /** OSGi callback to set the user directory. */
332   @Reference
333   public void setUserDirectoryService(UserDirectoryService userDirectoryService) {
334     this.userDir = userDirectoryService;
335   }
336 
337   /** OSGi callback to set the organization directory server. */
338   @Reference
339   public void setOrganizationDirectoryService(OrganizationDirectoryService organizationDirectoryService) {
340     this.orgDir = organizationDirectoryService;
341   }
342 
343   /**
344    * Get a mandatory, non-blank value from a dictionary.
345    *
346    * @throws ConfigurationException
347    *           key does not exist or its value is blank
348    */
349   private static String getCfg(Dictionary d, String key) throws ConfigurationException {
350     Object p = d.get(key);
351     if (p == null)
352       throw new ConfigurationException(key, "does not exist");
353     String ps = p.toString();
354     if (StringUtils.isBlank(ps))
355       throw new ConfigurationException(key, "is blank");
356     return ps;
357   }
358 
359   private static Map<String, String> getCfgAsMap(final Dictionary d, final String key) {
360 
361     HashMap<String, String> config = new HashMap<>();
362     if (d == null) return config;
363     for (Enumeration e = d.keys(); e.hasMoreElements();) {
364       final String dKey = Objects.toString(e.nextElement());
365       if (dKey.startsWith(key)) {
366         config.put(dKey.substring(key.length() + 1), Objects.toString(d.get(dKey), null));
367       }
368     }
369     return config;
370   }
371 
372   @Reference
373   public void setSeriesService(SeriesService seriesService) {
374     this.seriesService = seriesService;
375   }
376 
377   @Reference
378   public void setSchedulerService(SchedulerService schedulerService) {
379     this.schedulerService = schedulerService;
380   }
381 
382   @Reference
383   public void setWorkspace(Workspace workspace) {
384     this.workspace = workspace;
385   }
386 }