InfluxRunningTotalStatisticsProvider.java

/*
 * Licensed to The Apereo Foundation under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional
 * information regarding copyright ownership.
 *
 *
 * The Apereo Foundation licenses this file to you under the Educational
 * Community License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License
 * at:
 *
 *   http://opensource.org/licenses/ecl2.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations under
 * the License.
 *
 */

package org.opencastproject.statistics.provider.influx.provider;

import org.opencastproject.statistics.api.DataResolution;
import org.opencastproject.statistics.api.ResourceType;
import org.opencastproject.statistics.api.TimeSeries;
import org.opencastproject.statistics.api.TimeSeriesProvider;
import org.opencastproject.statistics.provider.influx.StatisticsProviderInfluxService;
import org.opencastproject.util.data.Tuple;

import org.influxdb.dto.BoundParameterQuery;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class InfluxRunningTotalStatisticsProvider extends InfluxStatisticsProvider implements TimeSeriesProvider {
  private final Set<InfluxProviderConfiguration.InfluxProviderSource> sources;

  public InfluxRunningTotalStatisticsProvider(
          StatisticsProviderInfluxService service,
          String id,
          ResourceType resourceType,
          String title,
          String description,
          Set<InfluxProviderConfiguration.InfluxProviderSource> sources) {
    super(service, id, resourceType, title, description);
    this.sources = sources;
  }

  private static double reduceResult(double previousTotal, QueryResult.Result newResult) {
    if (newResult.getSeries() == null) {
      return previousTotal;
    }
    return previousTotal + newResult
            .getSeries()
            .stream()
            .reduce(0.0, InfluxRunningTotalStatisticsProvider::reduceSeries, Double::sum);
  }

  private static double reduceSeries(double previousSeriesTotal, QueryResult.Series newSeries) {
    if (newSeries.getValues().isEmpty()) {
      return previousSeriesTotal;
    }
    if (newSeries.getValues().size() > 1) {
      throw new RuntimeException("invalid results returned for aggregation");
    }
    final List<Object> objects = newSeries.getValues().get(0);
    if (objects.size() != 2) {
      throw new RuntimeException("invalid results returned for aggregation");
    }
    final Object o = objects.get(1);
    if (o == null) {
      return previousSeriesTotal;
    }
    if (!(o instanceof Double)) {
      throw new RuntimeException("invalid results returned for aggregation");
    }
    return previousSeriesTotal + (Double) o;
  }

  private double getPreviousTotal(
          final InfluxProviderConfiguration.InfluxProviderSource source, final String resourceId, final Instant from) {
    final Query beforeQuery = BoundParameterQuery.QueryBuilder
            .newQuery("SELECT SUM(" + source.getAggregationVariable() + ") FROM " + source.getMeasurement() + " WHERE "
                              + source.getResourceIdName() + "=$resourceId AND time<$from")
            .bind("resourceId", resourceId)
            .bind("from", from)
            .create();
    final QueryResult beforeResults = service.getInfluxDB().query(beforeQuery);
    if (beforeResults.hasError()) {
      throw new RuntimeException("Error while retrieving result from influx: " + beforeResults.getError());
    }
    return beforeResults
            .getResults()
            .stream()
            .reduce(0.0, InfluxRunningTotalStatisticsProvider::reduceResult, Double::sum);
  }

  @Override
  public TimeSeries getValues(String resourceId, Instant from, Instant to, DataResolution resolution, ZoneId zoneId) {
    final String influxGrouping = dataResolutionToInfluxGrouping(resolution);
    final List<Tuple<Instant, Instant>> periods = getPeriods(from, to, resolution, zoneId);
    final List<String> labels = new ArrayList<>();
    final List<Double> values = new ArrayList<>();
    final InfluxProviderConfiguration.InfluxProviderSource source = getSource(resolution);
    double previousTotal = getPreviousTotal(source, resourceId, from);
    for (final Tuple<Instant, Instant> period : periods) {
      final Query query = BoundParameterQuery.QueryBuilder
              .newQuery("SELECT " + source.getAggregation() + "(" + source.getAggregationVariable() + ") FROM " + source
                      .getMeasurement() + " WHERE " + source.getResourceIdName()
                                + "=$resourceId AND time>=$from AND time<=$to" + influxGrouping)
              .bind("resourceId", resourceId)
              .bind("from", period.getA())
              .bind("to", period.getB())
              .create();
      final QueryResult results = service.getInfluxDB().query(query);
      final Tuple<TimeSeries, Double> currentViews = queryResultToTimeSeries(results, previousTotal, period.getA());
      previousTotal = currentViews.getB();
      labels.addAll(currentViews.getA().getLabels());
      values.addAll(currentViews.getA().getValues());
    }
    final Double total = "SUM".equalsIgnoreCase(source.getAggregation())
            ? values.stream().mapToDouble(v -> v).sum()
            : null;
    return new TimeSeries(labels, values, total);
  }

  private InfluxProviderConfiguration.InfluxProviderSource getSource(DataResolution resolution) {
    return sources
            .stream()
            .filter(s -> s.getResolutions().contains(resolution))
            .findAny()
            .orElseThrow(() -> new IllegalStateException(
                    "No source available for data resolution " + resolution.name()));
  }

  @Override
  public Set<DataResolution> getDataResolutions() {
    return new HashSet<>(Arrays.asList(DataResolution.YEARLY, DataResolution.MONTHLY, DataResolution.DAILY));
  }

  private Tuple<TimeSeries, Double> queryResultToTimeSeries(
          final QueryResult results,
          final double previousTotal,
          final Instant periodStart) {
    if (results.hasError()) {
      throw new RuntimeException("Error while retrieving result from influx: " + results.getError());
    }
    final List<String> labels = new ArrayList<>();
    final List<Double> values = new ArrayList<>();
    double previousSum = previousTotal;
    for (final QueryResult.Result result : results.getResults()) {
      if (result.getSeries() == null || result.getSeries().isEmpty()) {
        labels.add(periodStart.toString());
        values.add(previousSum);
        continue;
      }
      labels.addAll(result
                            .getSeries()
                            .get(0)
                            .getValues()
                            .stream()
                            .map(l -> (String) l.get(0))
                            .collect(Collectors.toList()));
      for (List<Object> valueList : result.getSeries().get(0).getValues()) {
        final Double currentValue = reduceValueList(valueList);
        previousSum += currentValue;
        values.add(previousSum);
      }
    }
    return Tuple.tuple(new TimeSeries(labels, values), previousSum);
  }

  private double reduceValueList(List<Object> valueList) {
    Object v = valueList.get(1);
    if (v == null) {
      return 0.0;
    }
    return (Double) v;
  }
}