2020-04-13 17:15:41 +02:00
#!/usr/bin/env python
"""
Main entrypoint
"""
__author__ = " @tormakris "
__copyright__ = " Copyright 2020, Birbnetes Team "
__module_name__ = " app "
__version__text__ = " 1 "
from datetime import datetime
import json
import asyncio
from aio_pika import connect , IncomingMessage , ExchangeType
import sentry_sdk
from aioinflux import InfluxDBClient
import config
2020-06-20 20:20:36 +02:00
import requests
2020-04-13 17:15:41 +02:00
if config . SENTRY_DSN :
sentry_sdk . init (
dsn = config . SENTRY_DSN ,
send_default_pii = True ,
release = config . RELEASE_ID ,
environment = config . RELEASEMODE
)
async def on_message ( message : IncomingMessage ) :
"""
on_message doesn ' t necessarily have to be defined as async.
Here it is to show that it ' s possible.
"""
msg_json = json . loads ( message . body )
now = datetime . now ( )
2020-06-20 20:20:36 +02:00
r = requests . get ( f " http://input-service/sample/ { msg_json [ ' tag ' ] } " )
r . raise_for_status ( )
2020-04-13 17:15:41 +02:00
point = {
' time ' : now ,
' measurement ' : ' sturnus ' ,
2020-06-20 20:20:36 +02:00
' tags ' : {
' device ' : r . json ( ) [ ' device_id ' ]
} ,
' fields ' : {
' probability ' : float ( msg_json [ ' probability ' ] ) # This is required because otherwise of the json serialization and de-serialization round values would interpret as ints and influxdb would fail
}
2020-04-13 17:15:41 +02:00
}
2020-06-20 20:20:36 +02:00
2020-04-13 17:15:41 +02:00
async with InfluxDBClient ( username = config . INFLUXDB_USERNAME , password = config . INFLUXDB_PASSWORD ,
2020-05-29 18:35:45 +02:00
host = config . INFLUXDB_HOST , port = config . INFLUXDB_PORT , ssl = False ,
database = config . INFLUXDB_DB ) as client :
await client . create_database ( config . INFLUXDB_DB )
2020-04-13 17:15:41 +02:00
await client . write ( point )
async def main ( loop_ ) :
# Perform connection
connection = await connect (
config . RABBITMQ_HOST , loop = loop_
)
# Creating a channel
channel = await connection . channel ( )
# Declare exchange
exchange = await channel . declare_exchange (
config . RABBITMQ_EXCHANGE , ExchangeType . FANOUT
)
# Declaring queue
queue = await channel . declare_queue ( exclusive = True )
# Bind queue to exchange
await queue . bind ( exchange )
# Start listening the queue with name 'hello'
await queue . consume ( on_message , no_ack = True )
if __name__ == " __main__ " :
loop = asyncio . get_event_loop ( )
loop . create_task ( main ( loop ) )
# we enter a never-ending loop that waits for data and
# runs callbacks whenever necessary.
loop . run_forever ( )