storage-service/storage_service/utils/healthchecks.py

34 lines
858 B
Python
Raw Normal View History

2020-11-19 01:49:40 +01:00
#!/usr/bin/env python3
2021-08-19 03:25:30 +02:00
from utils import storage, magic_amqp
2021-08-19 03:12:09 +02:00
from healthcheck import HealthCheck
from flask import Flask
2020-11-19 01:49:40 +01:00
def health_database_status():
is_database_working = True
output = 'storage is ok'
try:
storage.connection.list_buckets()
except Exception as e:
output = str(e)
is_database_working = False
return is_database_working, output
2021-08-19 03:12:09 +02:00
2021-08-19 03:25:30 +02:00
def amqp_connection_status():
if magic_amqp.is_healthy():
result = True
text = "amqp connection is ok"
else:
result = False
text = "amqp connection is unhealthy"
return result, text
2021-08-19 03:12:09 +02:00
def register_health_checks(app: Flask):
health = HealthCheck()
health.add_check(health_database_status)
2021-08-19 03:25:30 +02:00
health.add_check(amqp_connection_status)
2021-08-19 03:12:09 +02:00
app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run())