#!/usr/bin/env python """ Main entrypoint """ __author__ = "@tormakris" __copyright__ = "Copyright 2020, Birbnetes Team" __module_name__ = "app" __version__text__ = "1" from datetime import datetime import json import asyncio from aio_pika import connect, IncomingMessage, ExchangeType import sentry_sdk from aioinflux import InfluxDBClient import config if config.SENTRY_DSN: sentry_sdk.init( dsn=config.SENTRY_DSN, send_default_pii=True, release=config.RELEASE_ID, environment=config.RELEASEMODE ) async def on_message(message: IncomingMessage): """ on_message doesn't necessarily have to be defined as async. Here it is to show that it's possible. """ msg_json = json.loads(message.body) now = datetime.now() point = { 'time': now, 'measurement': 'sturnus', 'tags': {'device': 'dummy'}, 'fields': {'probability': msg_json['probability']} } async with InfluxDBClient(username=config.INFLUXDB_USERNAME, password=config.INFLUXDB_PASSWORD, host=config.INFLUXDB_HOST, port=config.INFLUXDB_PORT, ssl=False, database=config.INFLUXDB_DB) as client: await client.create_database(config.INFLUXDB_DB) await client.write(point) async def main(loop_): # Perform connection connection = await connect( config.RABBITMQ_HOST, loop=loop_ ) # Creating a channel channel = await connection.channel() # Declare exchange exchange = await channel.declare_exchange( config.RABBITMQ_EXCHANGE, ExchangeType.FANOUT ) # Declaring queue queue = await channel.declare_queue(exclusive=True) # Bind queue to exchange await queue.bind(exchange) # Start listening the queue with name 'hello' await queue.consume(on_message, no_ack=True) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.create_task(main(loop)) # we enter a never-ending loop that waits for data and # runs callbacks whenever necessary. loop.run_forever()