diff --git a/src/app.py b/src/app.py index 532cb79..5d9a346 100644 --- a/src/app.py +++ b/src/app.py @@ -17,6 +17,7 @@ import atexit from apscheduler.schedulers.background import BackgroundScheduler from magic_ampq import magic_ampq +from error_handlers import register_all_error_handlers """ Main Flask RESTful API @@ -72,5 +73,7 @@ api.add_resource(SampleParameterResource, '/sample/') health.add_check(health_database_status) health.add_check(ampq_connection_status) +register_all_error_handlers(app) + app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run()) diff --git a/src/error_handlers.py b/src/error_handlers.py new file mode 100644 index 0000000..77b247e --- /dev/null +++ b/src/error_handlers.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 + + +def get_standard_error_handler(code: int): + def error_handler(err): + return {"msg": str(err)}, code + + return error_handler + + +# function to register all handlers + + +def register_all_error_handlers(app): + error_codes_to_override = [404, 403, 401, 405, 400, 409, 422, 500] + + for code in error_codes_to_override: + app.register_error_handler(code, get_standard_error_handler(code)) \ No newline at end of file diff --git a/src/magic_ampq.py b/src/magic_ampq.py index bb625a6..fc5cb95 100644 --- a/src/magic_ampq.py +++ b/src/magic_ampq.py @@ -50,19 +50,33 @@ class MagicAMPQ: """ This method should be called periodically to keep up the connection """ + lock_start = time.time() with self._lock: + lock_acquire_time = time.time() - lock_start + if lock_acquire_time >= 1.5: + self.app.logger.warning(f"Loop: Lock acquire took {lock_acquire_time:5f} sec") + try: self._pika_connection.process_data_events(0) # We won't attempt retry if this fail - except pika.exceptions.AMQPConnectionError: + except pika.exceptions.AMQPConnectionError as e: + self.app.logger.warning(f"Connection error during process loop: {e} (attempting reconnect)") self._reconnect_ampq() + total_time = time.time() - lock_start + if total_time > 2: + self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec") + def publish(self, payload=None): """ Publish a simple json serialized message to the configured queue. If the connection is broken, then this call will block until the connection is restored """ + lock_start = time.time() with self._lock: + lock_acquire_time = time.time() - lock_start + if lock_acquire_time >= 0.3: + self.app.logger.warning(f"Publish: Lock acquire took {lock_acquire_time:5f} sec") tries = 0 while True: try: @@ -72,7 +86,8 @@ class MagicAMPQ: body=json.dumps(payload).encode('UTF-8') ) break # message sent successfully - except pika.exceptions.AMQPConnectionError: + except pika.exceptions.AMQPConnectionError as e: + self.app.logger.warning(f"Connection error during publish: {e} (attempting reconnect)") if tries > 30: raise # just give up @@ -81,7 +96,8 @@ class MagicAMPQ: try: self._reconnect_ampq() break - except pika.exceptions.AMQPConnectionError: + except pika.exceptions.AMQPConnectionError as e: + self.app.logger.warning(f"Connection error during reconnection: {e} (attempting reconnect)") tries += 1 if tries > 30: @@ -89,6 +105,9 @@ class MagicAMPQ: if tries > 10: time.sleep(2) + total_time = time.time() - lock_start + if total_time > 0.5: + self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec") def is_healthy(self) -> bool: with self._lock: diff --git a/src/resources.py b/src/resources.py index 6747232..0c45aeb 100644 --- a/src/resources.py +++ b/src/resources.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import json +import time from datetime import datetime import tzlocal from xeger import Xeger @@ -92,10 +93,15 @@ class SampleResource(Resource): soundfile.content_type, {'Content-Length': soundfile_content_length})} + upload_started = time.time() r = requests.post( f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object", files=files ) + upload_time = time.time() - upload_started + + if upload_time > 0.9: + current_app.logger.warning(f"Uploading to storage-service took {upload_time:5} sec") if r.status_code not in [200, 201]: return abort(500, f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}") @@ -104,7 +110,7 @@ class SampleResource(Resource): magic_ampq.publish({'tag': generated_tag}) except Exception as e: current_app.logger.exception(e) - return abort(569, "AMPQ Publish error") + return abort(500, f"AMPQ Publish error: {str(e)}") if current_app.config['ENABLE_INFLUXDB']: influx_db.write_points(