less prone to crashing

This commit is contained in:
Pünkösd Marcell 2021-07-28 14:10:59 +02:00
parent fbfb379e86
commit 87cfdecc2d
2 changed files with 56 additions and 24 deletions

View File

@ -40,18 +40,22 @@ if config.SENTRY_DSN:
def setup_rabbit(mqtt_: MQTT) -> None: def setup_rabbit(mqtt_: MQTT) -> None:
logging.info("Connecting to RabbitMQ...") logging.info("Connecting to RabbitMQ...")
credentials = pika.PlainCredentials(config.RABBIT_USERNAME, config.RABBIT_PASSWORD) credentials = pika.PlainCredentials(config.RABBIT_USERNAME, config.RABBIT_PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.RABBIT_HOSTNAME, while True:
credentials=credentials, connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.RABBIT_HOSTNAME,
heartbeat=30, credentials=credentials,
socket_timeout=45)) heartbeat=30,
channel = connection.channel() socket_timeout=45))
channel.exchange_declare(exchange=config.RABBIT_EXCHANGE, exchange_type='fanout') channel = connection.channel()
queue = channel.queue_declare(durable=True, auto_delete=True, queue=uuid.uuid4().urn.split(':')[2], channel.exchange_declare(exchange=config.RABBIT_EXCHANGE, exchange_type='fanout')
exclusive=True).method.queue queue = channel.queue_declare(durable=True, auto_delete=True, queue=uuid.uuid4().urn.split(':')[2],
channel.queue_bind(exchange=config.RABBIT_EXCHANGE, queue=queue) exclusive=True).method.queue
channel.basic_consume(queue=queue, on_message_callback=on_message_creator(mqtt_), auto_ack=True) channel.queue_bind(exchange=config.RABBIT_EXCHANGE, queue=queue)
logging.debug("Starting consumption...") channel.basic_consume(queue=queue, on_message_callback=on_message_creator(mqtt_), auto_ack=False)
channel.start_consuming() logging.debug("Starting consumption...")
try:
channel.start_consuming() # this automagically responds to heartbeats
except pika.exceptions.AMQPConnectionError as e:
logging.warning(f"AMQP Error happened: {e}; Reconnecting...")
def on_message_creator(mqtt_: MQTT): def on_message_creator(mqtt_: MQTT):
@ -59,27 +63,54 @@ def on_message_creator(mqtt_: MQTT):
This generator is used, so that the mqtt object can be injected just when the callback is registered This generator is used, so that the mqtt object can be injected just when the callback is registered
""" """
def on_message(channel, method_frame, header_frame, body): def on_message(
msg_json = json.loads(body) channel: pika.channel.Channel,
if 'probability' not in msg_json: method: pika.spec.Basic.Deliver,
logging.error("Malformed message from classifier") properties: pika.spec.BasicProperties,
body: bytes
):
try:
msg_json = json.loads(body)
except (json.JSONDecodeError, UnicodeDecodeError) as e:
logging.error(f"Malformed message from classifier: {e}")
channel.basic_ack(delivery_tag=method.delivery_tag)
return return
if 'class' not in msg_json: if ('probability' not in msg_json) or ('class' not in msg_json):
logging.error("Malformed message from classifier") logging.error("Malformed message from classifier: Missing fields")
channel.basic_ack(delivery_tag=method.delivery_tag)
return return
# TODO: strurnus should not be hardcoded here # TODO: strurnus should not be hardcoded here
if (msg_json['class'] == 'sturnus') and (msg_json['probability'] > config.TRIGGER_LEVEL): if (msg_json['class'] == 'sturnus') and (msg_json['probability'] > config.TRIGGER_LEVEL):
r = requests.get(f"http://{config.INPUT_HOSTNAME}/sample/{msg_json['tag']}") try:
r.raise_for_status() r = requests.get(
f"http://{config.INPUT_HOSTNAME}/sample/{msg_json['tag']}",
timeout=config.INPUT_TIMEOUT
)
except requests.exceptions.Timeout:
logging.error(f"Input-service timed out! (Timeout: {config.INPUT_TIMEOUT} sec)")
return # no ack
if r.status_code != 200:
logging.error(f"Input-service status code is not 200: {r.status_code}")
return # no ack
if 'device_id' not in r.json(): if 'device_id' not in r.json():
logging.error("Input-service response invalid") logging.error("Input-service response invalid")
return return # no ack
logging.info(f"Sending alert command to device {r.json()['device_id']}") logging.info(f"Sending alert command to device {r.json()['device_id']}...")
mqtt_.publish(subtopic=r.json()['device_id'], mqtt_.publish(
message=json.dumps({"command": "doAlert"})) subtopic=r.json()['device_id'],
message=json.dumps({"command": "doAlert"})
)
else:
logging.debug(f"Probability is either bellow trigger level, or not the target class. Nothing to do.")
# This concludes the job
channel.basic_ack(delivery_tag=method.delivery_tag)
return on_message return on_message

View File

@ -30,4 +30,5 @@ MQTT_PASSWORD = os.getenv("GUARD_MQTT_PASSWORD", "guard-service")
MQTT_TOPIC = os.getenv("GUARD_MQTT_TOPIC", "guard-service") MQTT_TOPIC = os.getenv("GUARD_MQTT_TOPIC", "guard-service")
INPUT_HOSTNAME = os.getenv("INPUT_SVC_HOSTNAME", "input-service") INPUT_HOSTNAME = os.getenv("INPUT_SVC_HOSTNAME", "input-service")
INPUT_TIMEOUT = int(os.environ.get("INPUT_SVC_TIMEOUT", 5))
TRIGGER_LEVEL = float(os.environ.get("TRIGGER_LEVEL", 0.51)) TRIGGER_LEVEL = float(os.environ.get("TRIGGER_LEVEL", 0.51))