diff --git a/requirements.txt b/requirements.txt index 7bcb000..af37789 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ aio-pika sentry-sdk -aioinflux \ No newline at end of file +aioinflux +requests \ No newline at end of file diff --git a/src/app.py b/src/app.py index dd3f174..a9bb21e 100644 --- a/src/app.py +++ b/src/app.py @@ -16,6 +16,7 @@ from aio_pika import connect, IncomingMessage, ExchangeType import sentry_sdk from aioinflux import InfluxDBClient import config +import requests if config.SENTRY_DSN: sentry_sdk.init( @@ -33,12 +34,23 @@ async def on_message(message: IncomingMessage): """ msg_json = json.loads(message.body) now = datetime.now() + + r = requests.get(f"http://input-service/sample/{msg_json['tag']}") + r.raise_for_status() + point = { 'time': now, 'measurement': 'sturnus', - 'tags': {'device': 'dummy'}, - 'fields': {'probability': msg_json['probability']} + 'tags': { + 'device': r.json()['device_id'] + }, + 'fields': { + 'probability': float(msg_json['probability']) # This is required because otherwise of the json serialization and de-serialization round values would interpret as ints and influxdb would fail + } } + + + async with InfluxDBClient(username=config.INFLUXDB_USERNAME, password=config.INFLUXDB_PASSWORD, host=config.INFLUXDB_HOST, port=config.INFLUXDB_PORT, ssl=False, database=config.INFLUXDB_DB) as client: