2020-04-13 17:15:41 +02:00
|
|
|
#!/usr/bin/env python
|
|
|
|
|
|
|
|
"""
|
|
|
|
Main entrypoint
|
|
|
|
"""
|
|
|
|
|
|
|
|
__author__ = "@tormakris"
|
|
|
|
__copyright__ = "Copyright 2020, Birbnetes Team"
|
|
|
|
__module_name__ = "app"
|
|
|
|
__version__text__ = "1"
|
|
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
import json
|
|
|
|
import asyncio
|
|
|
|
from aio_pika import connect, IncomingMessage, ExchangeType
|
|
|
|
import sentry_sdk
|
|
|
|
from aioinflux import InfluxDBClient
|
|
|
|
import config
|
2020-06-20 20:20:36 +02:00
|
|
|
import requests
|
2020-06-20 22:14:34 +02:00
|
|
|
import tzlocal
|
2020-04-13 17:15:41 +02:00
|
|
|
|
|
|
|
if config.SENTRY_DSN:
|
|
|
|
sentry_sdk.init(
|
|
|
|
dsn=config.SENTRY_DSN,
|
|
|
|
send_default_pii=True,
|
|
|
|
release=config.RELEASE_ID,
|
|
|
|
environment=config.RELEASEMODE
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
async def on_message(message: IncomingMessage):
|
|
|
|
"""
|
|
|
|
on_message doesn't necessarily have to be defined as async.
|
|
|
|
Here it is to show that it's possible.
|
|
|
|
"""
|
|
|
|
msg_json = json.loads(message.body)
|
2020-06-20 22:14:34 +02:00
|
|
|
now = datetime.now(tz=tzlocal.get_localzone())
|
2020-06-20 20:20:36 +02:00
|
|
|
|
|
|
|
r = requests.get(f"http://input-service/sample/{msg_json['tag']}")
|
|
|
|
r.raise_for_status()
|
|
|
|
|
2021-06-14 03:34:50 +02:00
|
|
|
fields = {
|
|
|
|
k: float(v) for k, v in msg_json['all_predictions']
|
|
|
|
}
|
|
|
|
fields['_classification_duration'] = float(msg_json['classification_duration'])
|
|
|
|
|
2020-04-13 17:15:41 +02:00
|
|
|
point = {
|
|
|
|
'time': now,
|
|
|
|
'measurement': 'sturnus',
|
2020-06-20 20:20:36 +02:00
|
|
|
'tags': {
|
|
|
|
'device': r.json()['device_id']
|
|
|
|
},
|
2021-06-14 03:34:50 +02:00
|
|
|
'fields': fields
|
2020-04-13 17:15:41 +02:00
|
|
|
}
|
2020-06-20 20:20:36 +02:00
|
|
|
|
2020-04-13 17:15:41 +02:00
|
|
|
async with InfluxDBClient(username=config.INFLUXDB_USERNAME, password=config.INFLUXDB_PASSWORD,
|
2020-05-29 18:35:45 +02:00
|
|
|
host=config.INFLUXDB_HOST, port=config.INFLUXDB_PORT, ssl=False,
|
|
|
|
database=config.INFLUXDB_DB) as client:
|
|
|
|
await client.create_database(config.INFLUXDB_DB)
|
2020-04-13 17:15:41 +02:00
|
|
|
await client.write(point)
|
|
|
|
|
|
|
|
|
|
|
|
async def main(loop_):
|
|
|
|
# Perform connection
|
|
|
|
connection = await connect(
|
|
|
|
config.RABBITMQ_HOST, loop=loop_
|
|
|
|
)
|
|
|
|
|
|
|
|
# Creating a channel
|
|
|
|
channel = await connection.channel()
|
|
|
|
|
|
|
|
# Declare exchange
|
|
|
|
exchange = await channel.declare_exchange(
|
|
|
|
config.RABBITMQ_EXCHANGE, ExchangeType.FANOUT
|
|
|
|
)
|
|
|
|
|
|
|
|
# Declaring queue
|
|
|
|
queue = await channel.declare_queue(exclusive=True)
|
|
|
|
|
|
|
|
# Bind queue to exchange
|
|
|
|
await queue.bind(exchange)
|
|
|
|
|
|
|
|
# Start listening the queue with name 'hello'
|
|
|
|
await queue.consume(on_message, no_ack=True)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
loop.create_task(main(loop))
|
|
|
|
|
|
|
|
# we enter a never-ending loop that waits for data and
|
|
|
|
# runs callbacks whenever necessary.
|
|
|
|
loop.run_forever()
|