#!/usr/bin/env python """ Main entrypoint """ __author__ = "@tormakris" __copyright__ = "Copyright 2020, Birbnetes Team" __module_name__ = "app" __version__text__ = "1" from datetime import datetime import json import asyncio from aio_pika import connect, IncomingMessage, ExchangeType import sentry_sdk from aioinflux import InfluxDBClient import config import requests import tzlocal if config.SENTRY_DSN: sentry_sdk.init( dsn=config.SENTRY_DSN, send_default_pii=True, release=config.RELEASE_ID, environment=config.RELEASEMODE ) async def on_message(message: IncomingMessage): """ on_message doesn't necessarily have to be defined as async. Here it is to show that it's possible. """ msg_json = json.loads(message.body) now = datetime.now(tz=tzlocal.get_localzone()) r = requests.get(f"http://input-service/input/{msg_json['tag']}") r.raise_for_status() fields = { k: float(v) for k, v in msg_json['all_predictions'].items() } fields['_classification_duration'] = float(msg_json['classification_duration']) fields['_legacy_result'] = 1.0 if msg_json['class'] == 'sturnus' else 0.0 point = { 'time': now, 'measurement': 'sturnus', 'tags': { 'device': r.json()['device_id'] }, 'fields': fields } async with InfluxDBClient(username=config.INFLUXDB_USERNAME, password=config.INFLUXDB_PASSWORD, host=config.INFLUXDB_HOST, port=config.INFLUXDB_PORT, ssl=False, database=config.INFLUXDB_DB) as client: await client.create_database(config.INFLUXDB_DB) await client.write(point) async def main(loop_): # Perform connection connection = await connect( config.RABBITMQ_HOST, loop=loop_ ) # Creating a channel channel = await connection.channel() # Declare exchange exchange = await channel.declare_exchange( config.RABBITMQ_EXCHANGE, ExchangeType.FANOUT ) # Declaring queue queue = await channel.declare_queue(exclusive=True) # Bind queue to exchange await queue.bind(exchange) # Start listening the queue with name 'hello' await queue.consume(on_message, no_ack=True) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.create_task(main(loop)) # we enter a never-ending loop that waits for data and # runs callbacks whenever necessary. loop.run_forever()