#!/usr/bin/env python3 from flask import request, current_app, Response, jsonify from flask_classful import FlaskView, route from utils import json_required, influxdb_instance from influxdb_client import Point class ReportBuilder: def __init__(self, tags: tuple): self._tags = tags self._report = {} def update(self, query_result, type_: str, aggregator: str): for table in query_result: for row in table.records: key = ":".join(row.values[tag] for tag in self._tags) if key not in self._report: self._report[key] = {} if type_ not in self._report[key]: self._report[key][type_] = {} self._report[key][type_][aggregator] = row.values['_value'] def get_report(self) -> dict: return self._report class ReportView(FlaskView): # LINK @route("/link", methods=['POST']) @json_required def link_post(self): data = request.json points = [] for type_ in ['latency', 'rate']: if type_ in data['measurements']: points.append( Point(f"link_{type_}") .tag("client", data['client']) .tag("site", data['site']) .field("value", data['measurements'][type_]) ) influxdb_instance.connection.write_api().write("alma", current_app.config['INFLUXDB_V2_ORG'], points) return Response(status=201) @route("/link", methods=['GET']) def link_get(self): rep = ReportBuilder(('client', 'site')) for type_ in ['latency', 'rate']: for aggregator in [('mean', 'mean()'), ('derivative', 'derivative(unit: 5s)')]: query = f"""from(bucket: "alma") |> range(start: -2m, stop: now()) |> filter(fn: (r) => r["_measurement"] == "link_{type_}") |> {aggregator[1]} |> yield()""" rep.update( influxdb_instance.query_api.query(query), type_, aggregator[0] ) return jsonify(rep.get_report()) # CLIENT @route("/client", methods=['POST']) @json_required def client_post(self): data = request.json points = [] for type_ in ['queue']: if type_ in data['measurements']: points.append( Point(f"client_{type_}") .tag("client", data['client']) .field("value", data['measurements'][type_]) ) influxdb_instance.connection.write_api().write("alma", current_app.config['INFLUXDB_V2_ORG'], points) return Response(status=201) @route("/client", methods=['GET']) def client_get(self): rep = ReportBuilder(('client',)) for type_ in ['queue']: for aggregator in [('mean', 'mean()'), ('derivative', 'derivative(unit: 5s)')]: query = f"""from(bucket: "alma") |> range(start: -2m, stop: now()) |> filter(fn: (r) => r["_measurement"] == "client_{type_}") |> {aggregator[1]} |> yield()""" rep.update( influxdb_instance.query_api.query(query), type_, aggregator[0] ) return jsonify(rep.get_report()) # SITE @route("/site", methods=['POST']) @json_required def site_post(self): data = request.json points = [] for type_ in ['queue']: if type_ in data['measurements']: points.append( Point(f"site_{type_}") .tag("site", data['site']) .field("value", data['measurements'][type_]) ) influxdb_instance.connection.write_api().write("alma", current_app.config['INFLUXDB_V2_ORG'], points) return Response(status=201) @route("/site", methods=['GET']) def site_get(self): rep = ReportBuilder(('site',)) for type_ in ['queue']: for aggregator in [('mean', 'mean()'), ('derivative', 'derivative(unit: 5s)')]: query = f"""from(bucket: "alma") |> range(start: -2m, stop: now()) |> filter(fn: (r) => r["_measurement"] == "site_{type_}") |> {aggregator[1]} |> yield()""" rep.update( influxdb_instance.query_api.query(query), type_, aggregator[0] ) return jsonify(rep.get_report())