This commit is contained in:
78
src/app.py
Normal file
78
src/app.py
Normal file
@ -0,0 +1,78 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
Main entrypoint
|
||||
"""
|
||||
|
||||
__author__ = "@tormakris"
|
||||
__copyright__ = "Copyright 2020, Birbnetes Team"
|
||||
__module_name__ = "app"
|
||||
__version__text__ = "1"
|
||||
|
||||
from datetime import datetime
|
||||
import json
|
||||
import asyncio
|
||||
from aio_pika import connect, IncomingMessage, ExchangeType
|
||||
import sentry_sdk
|
||||
from aioinflux import InfluxDBClient
|
||||
import config
|
||||
|
||||
if config.SENTRY_DSN:
|
||||
sentry_sdk.init(
|
||||
dsn=config.SENTRY_DSN,
|
||||
send_default_pii=True,
|
||||
release=config.RELEASE_ID,
|
||||
environment=config.RELEASEMODE
|
||||
)
|
||||
|
||||
|
||||
async def on_message(message: IncomingMessage):
|
||||
"""
|
||||
on_message doesn't necessarily have to be defined as async.
|
||||
Here it is to show that it's possible.
|
||||
"""
|
||||
msg_json = json.loads(message.body)
|
||||
now = datetime.now()
|
||||
point = {
|
||||
'time': now,
|
||||
'measurement': 'sturnus',
|
||||
'tags': {'device': 'dummy'},
|
||||
'fields': {'probability': msg_json['probability']}
|
||||
}
|
||||
async with InfluxDBClient(username=config.INFLUXDB_USERNAME, password=config.INFLUXDB_PASSWORD,
|
||||
host=config.INFLUXDB_HOST, port=config.INFLUXDB_PORT) as client:
|
||||
await client.create_database(db=config.INFLUXDB_DB)
|
||||
await client.write(point)
|
||||
|
||||
|
||||
async def main(loop_):
|
||||
# Perform connection
|
||||
connection = await connect(
|
||||
config.RABBITMQ_HOST, loop=loop_
|
||||
)
|
||||
|
||||
# Creating a channel
|
||||
channel = await connection.channel()
|
||||
|
||||
# Declare exchange
|
||||
exchange = await channel.declare_exchange(
|
||||
config.RABBITMQ_EXCHANGE, ExchangeType.FANOUT
|
||||
)
|
||||
|
||||
# Declaring queue
|
||||
queue = await channel.declare_queue(exclusive=True)
|
||||
|
||||
# Bind queue to exchange
|
||||
await queue.bind(exchange)
|
||||
|
||||
# Start listening the queue with name 'hello'
|
||||
await queue.consume(on_message, no_ack=True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.create_task(main(loop))
|
||||
|
||||
# we enter a never-ending loop that waits for data and
|
||||
# runs callbacks whenever necessary.
|
||||
loop.run_forever()
|
25
src/config.py
Normal file
25
src/config.py
Normal file
@ -0,0 +1,25 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
Configuration
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
__author__ = "@tormakris"
|
||||
__copyright__ = "Copyright 2020, Birbnetes Team"
|
||||
__module_name__ = "config"
|
||||
__version__text__ = "1"
|
||||
|
||||
SENTRY_DSN = os.environ.get("SENTRY_DSN")
|
||||
RELEASE_ID = os.environ.get("RELEASE_ID", "test")
|
||||
RELEASEMODE = os.environ.get("RELEASEMODE", "dev")
|
||||
|
||||
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "amqp://guest:guest@localhost/")
|
||||
RABBITMQ_EXCHANGE = os.getenv("RABBITMQ_EXCHANGE", "output")
|
||||
|
||||
INFLUXDB_HOST = os.getenv("INFLUX_HOST", "output-service")
|
||||
INFLUXDB_PORT = os.getenv("INFLUX_PORT", "8086")
|
||||
INFLUXDB_USERNAME = os.getenv("INFLUX_USERNAME", "output-service")
|
||||
INFLUXDB_PASSWORD = os.getenv("INFLUX_PASSWORD", "output-service-supersecret")
|
||||
INFLUXDB_DB = os.getenv("INFLUX_DB", "output-service")
|
Reference in New Issue
Block a user