1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.statistics.provider.influx;
23
24 import org.opencastproject.statistics.api.ResourceType;
25 import org.opencastproject.statistics.api.StatisticsCoordinator;
26 import org.opencastproject.statistics.api.StatisticsProvider;
27 import org.opencastproject.statistics.api.StatisticsWriter;
28 import org.opencastproject.statistics.provider.influx.provider.InfluxProviderConfiguration;
29 import org.opencastproject.statistics.provider.influx.provider.InfluxRunningTotalStatisticsProvider;
30 import org.opencastproject.statistics.provider.influx.provider.InfluxTimeSeriesStatisticsProvider;
31 import org.opencastproject.util.ConfigurationException;
32
33 import org.apache.felix.fileinstall.ArtifactInstaller;
34 import org.influxdb.InfluxDB;
35 import org.influxdb.InfluxDBFactory;
36 import org.influxdb.dto.BatchPoints;
37 import org.influxdb.dto.Point;
38 import org.osgi.service.cm.ManagedService;
39 import org.osgi.service.component.ComponentContext;
40 import org.osgi.service.component.annotations.Activate;
41 import org.osgi.service.component.annotations.Component;
42 import org.osgi.service.component.annotations.Deactivate;
43 import org.osgi.service.component.annotations.Reference;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 import java.io.File;
48 import java.nio.charset.Charset;
49 import java.nio.file.Files;
50 import java.time.Duration;
51 import java.util.Dictionary;
52 import java.util.Map;
53 import java.util.concurrent.ConcurrentHashMap;
54 import java.util.concurrent.TimeUnit;
55
56
57
58
59 @Component(
60 immediate = true,
61 service = { ManagedService.class,ArtifactInstaller.class,StatisticsWriter.class },
62 property = {
63 "service.description=Statistics Provider Influx Service"
64 }
65 )
66 public class StatisticsProviderInfluxService implements ManagedService, ArtifactInstaller, StatisticsWriter {
67
68
69 private static final Logger logger = LoggerFactory.getLogger(StatisticsProviderInfluxService.class);
70
71 private static final String KEY_INFLUX_URI = "influx.uri";
72 private static final String KEY_INFLUX_USER = "influx.username";
73 private static final String KEY_INFLUX_PW = "influx.password";
74 private static final String KEY_INFLUX_DB = "influx.db";
75
76 private String influxUri = "http://127.0.0.1:8086";
77 private String influxUser = "root";
78 private String influxPw = "root";
79 private String influxDbName = "opencast";
80
81 private volatile InfluxDB influxDB;
82
83
84 private StatisticsCoordinator statisticsCoordinator;
85 private Map<String, StatisticsProvider> fileNameToProvider = new ConcurrentHashMap<>();
86
87
88 @Reference
89 public void setStatisticsCoordinator(StatisticsCoordinator service) {
90 this.statisticsCoordinator = service;
91 }
92
93 @Activate
94 public void activate(ComponentContext cc) {
95 logger.info("Activating Statistics Provider Influx Service");
96 }
97
98 @Deactivate
99 public void deactivate(ComponentContext cc) {
100 logger.info("Deactivating Statistics Provider Influx Service");
101 disconnectInflux();
102 }
103
104 @Override
105 public void install(File file) throws Exception {
106 final String json = new String(Files.readAllBytes(file.toPath()), Charset.forName("utf-8"));
107 final InfluxProviderConfiguration providerCfg = InfluxProviderConfiguration.fromJson(json);
108 StatisticsProvider provider;
109 switch (providerCfg.getType().toLowerCase()) {
110 case "timeseries": {
111 provider = new InfluxTimeSeriesStatisticsProvider(
112 this,
113 providerCfg.getId(),
114 providerCfg.getResourceType(),
115 providerCfg.getTitle(),
116 providerCfg.getDescription(),
117 providerCfg.getSources());
118 }
119 break;
120 case "runningtotal":
121 provider = new InfluxRunningTotalStatisticsProvider(
122 this,
123 providerCfg.getId(),
124 ResourceType.ORGANIZATION,
125 providerCfg.getTitle(),
126 providerCfg.getDescription(),
127 providerCfg.getSources());
128 break;
129 default:
130 throw new ConfigurationException("Unknown influx statistics type: " + providerCfg.getType());
131 }
132 fileNameToProvider.put(file.getName(), provider);
133 if (influxDB != null) {
134 statisticsCoordinator.addProvider(provider);
135 }
136 }
137
138 @Override
139 public void uninstall(File file) {
140 if (fileNameToProvider.containsKey(file.getName())) {
141 statisticsCoordinator.removeProvider(fileNameToProvider.get(file.getName()));
142 fileNameToProvider.remove(file.getName());
143 }
144 }
145
146 @Override
147 public boolean canHandle(File file) {
148 return "statistics".equals(file.getParentFile().getName())
149 && file.getName().endsWith(".json")
150 && file.getName().toUpperCase().startsWith("influx.".toUpperCase());
151 }
152
153 @Override
154 public void update(File file) throws Exception {
155 uninstall(file);
156 install(file);
157 }
158
159 @Override
160 public void updated(Dictionary<String, ?> dictionary) {
161 if (dictionary == null) {
162 logger.info("No configuration available. Not connecting to influx DB.");
163 disconnectInflux();
164 } else {
165 final Object influxUriValue = dictionary.get(KEY_INFLUX_URI);
166 if (influxUriValue != null) {
167 influxUri = influxUriValue.toString();
168 }
169 final Object influxUserValue = dictionary.get(KEY_INFLUX_USER);
170 if (influxUserValue != null) {
171 influxUser = influxUserValue.toString();
172 }
173 final Object influxPwValue = dictionary.get(KEY_INFLUX_PW);
174 if (influxPwValue != null) {
175 influxPw = influxPwValue.toString();
176 }
177 final Object influxDbValue = dictionary.get(KEY_INFLUX_DB);
178 if (influxDbValue != null) {
179 influxDbName = influxDbValue.toString();
180 }
181 connectInflux();
182 }
183 }
184
185 public InfluxDB getInfluxDB() {
186 return influxDB;
187 }
188
189 private void connectInflux() {
190 disconnectInflux();
191 influxDB = InfluxDBFactory.connect(influxUri, influxUser, influxPw);
192 influxDB.setDatabase(influxDbName);
193 fileNameToProvider.values().forEach(provider -> statisticsCoordinator.addProvider(provider));
194 statisticsCoordinator.addWriter(this);
195 }
196
197 private void disconnectInflux() {
198 if (influxDB != null) {
199 fileNameToProvider.values().forEach(provider -> statisticsCoordinator.removeProvider(provider));
200 influxDB.close();
201 influxDB = null;
202 }
203 }
204
205 @Override
206 public void writeDuration(
207 String organizationId,
208 String measurementName,
209 String retentionPolicy,
210 String organizationIdResourceName,
211 String fieldName,
212 TimeUnit temporalResolution,
213 Duration duration) {
214 double divider;
215 switch (temporalResolution) {
216 case MILLISECONDS:
217 divider = 1.0;
218 break;
219 case SECONDS:
220 divider = 1000.0;
221 break;
222 case MINUTES:
223 divider = 1000.0 * 60.0;
224 break;
225 case HOURS:
226 divider = 1000.0 * 60.0 * 60.0;
227 break;
228 case DAYS:
229 divider = 1000.0 * 60.0 * 60.0 * 24.0;
230 break;
231 default:
232 throw new RuntimeException("nanosecond and microsecond resolution not supported");
233 }
234 final Point point = Point
235 .measurement(measurementName)
236 .tag(organizationIdResourceName, organizationId)
237 .addField(fieldName, duration.toMillis() / divider)
238 .build();
239 if (retentionPolicy == null) {
240 influxDB.write(point);
241 } else {
242 influxDB.write(BatchPoints.builder().point(point).retentionPolicy(retentionPolicy).build());
243 }
244 }
245
246 @Override
247 public String getId() {
248 return "influxdb-writer";
249 }
250 }