#!/usr/bin/env python """ Main entrypoint """ __author__ = "@tormakris" __copyright__ = "Copyright 2020, Birbnetes Team" __module_name__ = "app" __version__text__ = "1" from datetime import datetime import json import asyncio from aio_pika import connect, IncomingMessage, ExchangeType import sentry_sdk from aioinflux import InfluxDBClient import config import requests import tzlocal if config.SENTRY_DSN: sentry_sdk.init( dsn=config.SENTRY_DSN, send_default_pii=True, release=config.RELEASE_ID, environment=config.RELEASEMODE ) async def on_message(message: IncomingMessage): """ on_message doesn't necessarily have to be defined as async. Here it is to show that it's possible. """ msg_json = json.loads(message.body) now = datetime.now(tz=tzlocal.get_localzone()) r = requests.get(f"http://input-service/sample/{msg_json['tag']}") r.raise_for_status() point = { 'time': now, 'measurement': 'sturnus', 'tags': { 'device': r.json()['device_id'] }, 'fields': { 'probability': float(msg_json['probability']) # This is required because otherwise of the json serialization and de-serialization round values would interpret as ints and influxdb would fail } } async with InfluxDBClient(username=config.INFLUXDB_USERNAME, password=config.INFLUXDB_PASSWORD, host=config.INFLUXDB_HOST, port=config.INFLUXDB_PORT, ssl=False, database=config.INFLUXDB_DB) as client: await client.create_database(config.INFLUXDB_DB) await client.write(point) async def main(loop_): # Perform connection connection = await connect( config.RABBITMQ_HOST, loop=loop_ ) # Creating a channel channel = await connection.channel() # Declare exchange exchange = await channel.declare_exchange( config.RABBITMQ_EXCHANGE, ExchangeType.FANOUT ) # Declaring queue queue = await channel.declare_queue(exclusive=True) # Bind queue to exchange await queue.bind(exchange) # Start listening the queue with name 'hello' await queue.consume(on_message, no_ack=True) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.create_task(main(loop)) # we enter a never-ending loop that waits for data and # runs callbacks whenever necessary. loop.run_forever()