View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  
22  package org.opencastproject.statistics.provider.influx;
23  
24  import org.opencastproject.statistics.api.ResourceType;
25  import org.opencastproject.statistics.api.StatisticsCoordinator;
26  import org.opencastproject.statistics.api.StatisticsProvider;
27  import org.opencastproject.statistics.api.StatisticsWriter;
28  import org.opencastproject.statistics.provider.influx.provider.InfluxProviderConfiguration;
29  import org.opencastproject.statistics.provider.influx.provider.InfluxRunningTotalStatisticsProvider;
30  import org.opencastproject.statistics.provider.influx.provider.InfluxTimeSeriesStatisticsProvider;
31  import org.opencastproject.util.ConfigurationException;
32  
33  import org.apache.felix.fileinstall.ArtifactInstaller;
34  import org.influxdb.InfluxDB;
35  import org.influxdb.InfluxDBFactory;
36  import org.influxdb.dto.BatchPoints;
37  import org.influxdb.dto.Point;
38  import org.osgi.service.cm.ManagedService;
39  import org.osgi.service.component.ComponentContext;
40  import org.osgi.service.component.annotations.Activate;
41  import org.osgi.service.component.annotations.Component;
42  import org.osgi.service.component.annotations.Deactivate;
43  import org.osgi.service.component.annotations.Reference;
44  import org.slf4j.Logger;
45  import org.slf4j.LoggerFactory;
46  
47  import java.io.File;
48  import java.nio.charset.Charset;
49  import java.nio.file.Files;
50  import java.time.Duration;
51  import java.util.Dictionary;
52  import java.util.Map;
53  import java.util.concurrent.ConcurrentHashMap;
54  import java.util.concurrent.TimeUnit;
55  
56  /**
57   * Implements statistics providers using influxdb for permanent storage.
58   */
59  @Component(
60      immediate = true,
61      service = { ManagedService.class,ArtifactInstaller.class,StatisticsWriter.class },
62      property = {
63          "service.description=Statistics Provider Influx Service"
64      }
65  )
66  public class StatisticsProviderInfluxService implements ManagedService, ArtifactInstaller, StatisticsWriter {
67  
68    /** Logging utility */
69    private static final Logger logger = LoggerFactory.getLogger(StatisticsProviderInfluxService.class);
70  
71    private static final String KEY_INFLUX_URI = "influx.uri";
72    private static final String KEY_INFLUX_USER = "influx.username";
73    private static final String KEY_INFLUX_PW = "influx.password";
74    private static final String KEY_INFLUX_DB = "influx.db";
75  
76    private String influxUri = "http://127.0.0.1:8086";
77    private String influxUser = "root";
78    private String influxPw = "root";
79    private String influxDbName = "opencast";
80  
81    private volatile InfluxDB influxDB;
82  
83  
84    private StatisticsCoordinator statisticsCoordinator;
85    private Map<String, StatisticsProvider> fileNameToProvider = new ConcurrentHashMap<>();
86  
87  
88    @Reference
89    public void setStatisticsCoordinator(StatisticsCoordinator service) {
90      this.statisticsCoordinator = service;
91    }
92  
93    @Activate
94    public void activate(ComponentContext cc) {
95      logger.info("Activating Statistics Provider Influx Service");
96    }
97  
98    @Deactivate
99    public void deactivate(ComponentContext cc) {
100     logger.info("Deactivating Statistics Provider Influx Service");
101     disconnectInflux();
102   }
103 
104   @Override
105   public void install(File file) throws Exception {
106     final String json = new String(Files.readAllBytes(file.toPath()), Charset.forName("utf-8"));
107     final InfluxProviderConfiguration providerCfg = InfluxProviderConfiguration.fromJson(json);
108     StatisticsProvider provider;
109     switch (providerCfg.getType().toLowerCase()) {
110       case "timeseries": {
111         provider = new InfluxTimeSeriesStatisticsProvider(
112                 this,
113                 providerCfg.getId(),
114                 providerCfg.getResourceType(),
115                 providerCfg.getTitle(),
116                 providerCfg.getDescription(),
117                 providerCfg.getSources());
118       }
119       break;
120       case "runningtotal":
121         provider = new InfluxRunningTotalStatisticsProvider(
122                 this,
123                 providerCfg.getId(),
124                 ResourceType.ORGANIZATION,
125                 providerCfg.getTitle(),
126                 providerCfg.getDescription(),
127                 providerCfg.getSources());
128         break;
129       default:
130         throw new ConfigurationException("Unknown influx statistics type: " + providerCfg.getType());
131     }
132     fileNameToProvider.put(file.getName(), provider);
133     if (influxDB != null) {
134       statisticsCoordinator.addProvider(provider);
135     }
136   }
137 
138   @Override
139   public void uninstall(File file) {
140     if (fileNameToProvider.containsKey(file.getName())) {
141       statisticsCoordinator.removeProvider(fileNameToProvider.get(file.getName()));
142       fileNameToProvider.remove(file.getName());
143     }
144   }
145 
146   @Override
147   public boolean canHandle(File file) {
148     return "statistics".equals(file.getParentFile().getName())
149         && file.getName().endsWith(".json")
150         && file.getName().toUpperCase().startsWith("influx.".toUpperCase());
151   }
152 
153   @Override
154   public void update(File file) throws Exception {
155     uninstall(file);
156     install(file);
157   }
158 
159   @Override
160   public void updated(Dictionary<String, ?> dictionary) {
161     if (dictionary == null) {
162       logger.info("No configuration available. Not connecting to influx DB.");
163       disconnectInflux();
164     } else {
165       final Object influxUriValue = dictionary.get(KEY_INFLUX_URI);
166       if (influxUriValue != null) {
167         influxUri = influxUriValue.toString();
168       }
169       final Object influxUserValue = dictionary.get(KEY_INFLUX_USER);
170       if (influxUserValue != null) {
171         influxUser = influxUserValue.toString();
172       }
173       final Object influxPwValue = dictionary.get(KEY_INFLUX_PW);
174       if (influxPwValue != null) {
175         influxPw = influxPwValue.toString();
176       }
177       final Object influxDbValue = dictionary.get(KEY_INFLUX_DB);
178       if (influxDbValue != null) {
179         influxDbName = influxDbValue.toString();
180       }
181       connectInflux();
182     }
183   }
184 
185   public InfluxDB getInfluxDB() {
186     return influxDB;
187   }
188 
189   private void connectInflux() {
190     disconnectInflux();
191     influxDB = InfluxDBFactory.connect(influxUri, influxUser, influxPw);
192     influxDB.setDatabase(influxDbName);
193     fileNameToProvider.values().forEach(provider -> statisticsCoordinator.addProvider(provider));
194     statisticsCoordinator.addWriter(this);
195   }
196 
197   private void disconnectInflux() {
198     if (influxDB != null) {
199       fileNameToProvider.values().forEach(provider -> statisticsCoordinator.removeProvider(provider));
200       influxDB.close();
201       influxDB = null;
202     }
203   }
204 
205   @Override
206   public void writeDuration(
207           String organizationId,
208           String measurementName,
209           String retentionPolicy,
210           String organizationIdResourceName,
211           String fieldName,
212           TimeUnit temporalResolution,
213           Duration duration) {
214     double divider;
215     switch (temporalResolution) {
216       case MILLISECONDS:
217         divider = 1.0;
218         break;
219       case SECONDS:
220         divider = 1000.0;
221         break;
222       case MINUTES:
223         divider = 1000.0 * 60.0;
224         break;
225       case HOURS:
226         divider = 1000.0 * 60.0 * 60.0;
227         break;
228       case DAYS:
229         divider = 1000.0 * 60.0 * 60.0 * 24.0;
230         break;
231       default:
232         throw new RuntimeException("nanosecond and microsecond resolution not supported");
233     }
234     final Point point = Point
235             .measurement(measurementName)
236             .tag(organizationIdResourceName, organizationId)
237             .addField(fieldName, duration.toMillis() / divider)
238             .build();
239     if (retentionPolicy == null) {
240       influxDB.write(point);
241     } else {
242       influxDB.write(BatchPoints.builder().point(point).retentionPolicy(retentionPolicy).build());
243     }
244   }
245 
246   @Override
247   public String getId() {
248     return "influxdb-writer";
249   }
250 }