From 2e18d0b83e7504cf1847f99024ce1f87574fca00 Mon Sep 17 00:00:00 2001 From: marcsello Date: Sun, 12 Dec 2021 01:50:41 +0100 Subject: [PATCH] Added derivative --- birb_latency_collector/views/report_view.py | 139 +++++++++----------- 1 file changed, 64 insertions(+), 75 deletions(-) diff --git a/birb_latency_collector/views/report_view.py b/birb_latency_collector/views/report_view.py index 4f8a2f2..32d7fc2 100644 --- a/birb_latency_collector/views/report_view.py +++ b/birb_latency_collector/views/report_view.py @@ -5,6 +5,28 @@ from utils import json_required, influxdb_instance from influxdb_client import Point +class ReportBuilder: + + def __init__(self, tags: tuple): + self._tags = tags + self._report = {} + + def update(self, query_result, type_: str, aggregator: str): + for table in query_result: + for row in table.records: + key = ":".join(row.values[tag] for tag in self._tags) + if key not in self._report: + self._report[key] = {} + + if type_ not in self._report[key]: + self._report[key][type_] = {} + + self._report[key][type_][aggregator] = row.values['_value'] + + def get_report(self) -> dict: + return self._report + + class ReportView(FlaskView): # LINK @@ -30,45 +52,22 @@ class ReportView(FlaskView): @route("/link", methods=['GET']) def link_get(self): - response = {} + rep = ReportBuilder(('client', 'site')) - # Query latency + for type_ in ['latency', 'rate']: + for aggregator in [('mean', 'mean()'), ('derivative', 'derivative(unit: 5s)')]: + query = f"""from(bucket: "alma") + |> range(start: -2m, stop: now()) + |> filter(fn: (r) => r["_measurement"] == "link_{type_}") + |> {aggregator[1]} + |> yield()""" + rep.update( + influxdb_instance.query_api.query(query), + type_, + aggregator[0] + ) - query = """from(bucket: "alma") - |> range(start: -2m, stop: now()) - |> filter(fn: (r) => r["_measurement"] == "link_latency") - |> timedMovingAverage(every: 30s, period: 1m) - |> last() - |> yield()""" - latency_tables = influxdb_instance.query_api.query(query) - - for table in latency_tables: - for row in table.records: - key = row.values['client'] + ":" + row.values['site'] - if key in response: - response[key]['latency'] = row.values['_value'] - else: - response[key] = {'latency': row.values['_value']} - - # Query rate - - query = """from(bucket: "alma") - |> range(start: -2m, stop: now()) - |> filter(fn: (r) => r["_measurement"] == "link_rate") - |> timedMovingAverage(every: 30s, period: 1m) - |> last() - |> yield()""" - rate_tables = influxdb_instance.query_api.query(query) - - for table in rate_tables: - for row in table.records: - key = row.values['client'] + ":" + row.values['site'] - if key in response: - response[key]['rate'] = row.values['_value'] - else: - response[key] = {'rate': row.values['_value']} - - return jsonify(response) + return jsonify(rep.get_report()) # CLIENT @@ -92,27 +91,22 @@ class ReportView(FlaskView): @route("/client", methods=['GET']) def client_get(self): - response = {} + rep = ReportBuilder(('client',)) - # Query queue + for type_ in ['queue']: + for aggregator in [('mean', 'mean()'), ('derivative', 'derivative(unit: 5s)')]: + query = f"""from(bucket: "alma") + |> range(start: -2m, stop: now()) + |> filter(fn: (r) => r["_measurement"] == "client_{type_}") + |> {aggregator[1]} + |> yield()""" + rep.update( + influxdb_instance.query_api.query(query), + type_, + aggregator[0] + ) - query = """from(bucket: "alma") - |> range(start: -2m, stop: now()) - |> filter(fn: (r) => r["_measurement"] == "client_queue") - |> timedMovingAverage(every: 30s, period: 1m) - |> last() - |> yield()""" - queue_tables = influxdb_instance.query_api.query(query) - - for table in queue_tables: - for row in table.records: - key = row.values['client'] - if key in response: - response[key]['queue'] = row.values['_value'] - else: - response[key] = {'queue': row.values['_value']} - - return jsonify(response) + return jsonify(rep.get_report()) # SITE @@ -136,24 +130,19 @@ class ReportView(FlaskView): @route("/site", methods=['GET']) def site_get(self): - response = {} + rep = ReportBuilder(('site',)) - # Query queue + for type_ in ['queue']: + for aggregator in [('mean', 'mean()'), ('derivative', 'derivative(unit: 5s)')]: + query = f"""from(bucket: "alma") + |> range(start: -2m, stop: now()) + |> filter(fn: (r) => r["_measurement"] == "site_{type_}") + |> {aggregator[1]} + |> yield()""" + rep.update( + influxdb_instance.query_api.query(query), + type_, + aggregator[0] + ) - query = """from(bucket: "alma") - |> range(start: -2m, stop: now()) - |> filter(fn: (r) => r["_measurement"] == "site_queue") - |> timedMovingAverage(every: 30s, period: 1m) - |> last() - |> yield()""" - queue_tables = influxdb_instance.query_api.query(query) - - for table in queue_tables: - for row in table.records: - key = row.values['site'] - if key in response: - response[key]['queue'] = row.values['_value'] - else: - response[key] = {'queue': row.values['_value']} - - return jsonify(response) + return jsonify(rep.get_report())