diff --git a/requirements.txt b/requirements.txt index c429a12..2008c7b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,8 @@ opentracing~=2.4.0 cython +apscheduler + six deprecation diff --git a/svm_prefilter_service/config.py b/svm_prefilter_service/config.py index 8dce7b8..b3ff757 100644 --- a/svm_prefilter_service/config.py +++ b/svm_prefilter_service/config.py @@ -11,3 +11,7 @@ class Config: INPUT_SERVICE_URL = os.environ.get("INPUT_SERVICE_URL", "http://input-service/input") DROPALL = os.environ.get("DROPALL", "no").lower() in ['yes', 'true', '1'] + + REPORT_ALIAS = os.environ.get("REPORTER_ALIAS", os.environ['HOSTNAME']) + REPORT_URL = os.environ.get("REPORT_URL", "") + REPORT_INTERVAL = int(os.environ.get("REPORT_INTERVAL", 15)) diff --git a/svm_prefilter_service/mule.py b/svm_prefilter_service/mule.py index 7c634c4..509e433 100644 --- a/svm_prefilter_service/mule.py +++ b/svm_prefilter_service/mule.py @@ -15,6 +15,8 @@ from pyAudioAnalysis import audioBasicIO from pyAudioAnalysis import MidTermFeatures import numpy +from apscheduler.schedulers.background import BackgroundScheduler + if Config.SENTRY_DSN: sentry_sdk.init( dsn=Config.SENTRY_DSN, @@ -119,12 +121,37 @@ def lapatolas(q: Queue): q.put(message) +def reporter(q: Queue): + report = { + "site": Config.REPORT_ALIAS, + "measurements": { + "queue": q.qsize() + } + } + + print("Reporting queue length of", report) + + r = requests.post(Config.REPORT_URL, json=report) + + r.raise_for_status() + if r.status_code != 201: + print(Config.REPORT_URL, "Wrong response:", r.status_code) + + def main(): memer = ModelMemer() requeue = Queue() Thread(target=lapatolas, args=(requeue,), daemon=True).start() + scheduler = None + if Config.REPORT_URL: + scheduler = BackgroundScheduler() + scheduler.add_job(lambda: reporter(requeue), trigger='interval', seconds=Config.REPORT_INTERVAL) + scheduler.start() + + Thread(target=reporter, args=(requeue,), daemon=True).start() + while True: message = requeue.get(block=True) task = pickle.loads(message) @@ -149,6 +176,9 @@ def main(): finally: os.remove(audio_file_path) +# if scheduler: +# scheduler.stop() + if __name__ == '__main__': main()