InfluxTimeSeriesStatisticsProvider.java

/*
 * Licensed to The Apereo Foundation under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional
 * information regarding copyright ownership.
 *
 *
 * The Apereo Foundation licenses this file to you under the Educational
 * Community License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License
 * at:
 *
 *   http://opensource.org/licenses/ecl2.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 *
 */

package org.opencastproject.statistics.provider.influx.provider;

import org.opencastproject.statistics.api.DataResolution;
import org.opencastproject.statistics.api.ResourceType;
import org.opencastproject.statistics.api.TimeSeries;
import org.opencastproject.statistics.api.TimeSeriesProvider;
import org.opencastproject.statistics.provider.influx.StatisticsProviderInfluxService;
import org.opencastproject.util.data.Tuple;

import org.influxdb.InfluxDBIOException;
import org.influxdb.dto.BoundParameterQuery;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.ConnectException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class InfluxTimeSeriesStatisticsProvider extends InfluxStatisticsProvider implements TimeSeriesProvider {

  private static final Logger logger = LoggerFactory.getLogger(InfluxTimeSeriesStatisticsProvider.class);

  private Set<InfluxProviderConfiguration.InfluxProviderSource> sources;



  public InfluxTimeSeriesStatisticsProvider(
      StatisticsProviderInfluxService service,
      String id,
      ResourceType resourceType,
      String title,
      String description,
      Set<InfluxProviderConfiguration.InfluxProviderSource> sources
  ) {
    super(service, id, resourceType, title, description);
    this.sources = sources;
  }

  @Override
  public TimeSeries getValues(String resourceId, Instant from, Instant to, DataResolution resolution, ZoneId zoneId) {
    final String influxGrouping = dataResolutionToInfluxGrouping(resolution);
    final List<Tuple<Instant, Instant>> periods = getPeriods(from, to, resolution, zoneId);
    final List<String> labels = new ArrayList<>();
    final List<Double> values = new ArrayList<>();
    final InfluxProviderConfiguration.InfluxProviderSource source = getSource(resolution);
    for (final Tuple<Instant, Instant> period : periods) {
      final Query query = BoundParameterQuery.QueryBuilder
          .newQuery("SELECT " + source.getAggregation() + "(" + source.getAggregationVariable() + ") FROM "
                            + source.getMeasurement() + " WHERE " + source.getResourceIdName()
                            + "=$resourceId AND time>=$from AND time<=$to" + influxGrouping)
          .bind("resourceId", resourceId)
          .bind("from", period.getA())
          .bind("to", period.getB())
          .create();
      try {
        final QueryResult results = service.getInfluxDB().query(query);
        final TimeSeries currentViews = queryResultToTimeSeries(results);
        labels.addAll(currentViews.getLabels());
        values.addAll(currentViews.getValues());
      } catch (InfluxDBIOException e) {
        if (e.getCause() instanceof ConnectException) {
          logger.error("Influx connect exception: {}", e.getMessage());
        } else {
          throw e;
        }
      }
    }
    final Double total = "SUM".equalsIgnoreCase(source.getAggregation())
        ? values.stream().mapToDouble(v -> v).sum()
        : null;
    return new TimeSeries(labels, values, total);
  }

  @Override
  public Set<DataResolution> getDataResolutions() {
    return sources.stream().flatMap(s -> s.getResolutions().stream()).collect(Collectors.toSet());
  }

  private InfluxProviderConfiguration.InfluxProviderSource getSource(DataResolution resolution) {
    return sources.stream()
        .filter(s -> s.getResolutions().contains(resolution))
        .findAny()
        .orElseThrow(() -> new IllegalStateException("No source available for data resolution " + resolution.name()));
  }

  protected static TimeSeries queryResultToTimeSeries(QueryResult results) {
    if (results.hasError()) {
      throw new RuntimeException("Error while retrieving result from influx: " + results.getError());
    }
    final List<String> labels = new ArrayList<>();
    final List<Double> values = new ArrayList<>();
    for (final QueryResult.Result result : results.getResults()) {
      if (result.hasError()) {
        logger.warn("An element from the set of data returned by influx DB has an error: '{}'. Ignoring this one.",
            result.getError());
        continue;
      }
      if (result.getSeries() == null || result.getSeries().isEmpty()) {
        continue;
      }
      labels.addAll(result.getSeries().get(0).getValues().stream()
          .map(l -> (String) l.get(0))
          .collect(Collectors.toList()));
      values.addAll(result.getSeries().get(0).getValues().stream()
          .map(l -> l.get(1))
          .map(v -> v == null ? 0 : (Double) v)
          .collect(Collectors.toList()));
    }
    return new TimeSeries(labels, values);
  }
}