Added more error handling and reporting
	
		
			
	
		
	
	
		
	
		
			All checks were successful
		
		
	
	
		
			
				
	
				continuous-integration/drone/push Build is passing
				
			
		
		
	
	
				
					
				
			
		
			All checks were successful
		
		
	
	continuous-integration/drone/push Build is passing
				
			This commit is contained in:
		@@ -17,6 +17,7 @@ import atexit
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
from apscheduler.schedulers.background import BackgroundScheduler
 | 
					from apscheduler.schedulers.background import BackgroundScheduler
 | 
				
			||||||
from magic_ampq import magic_ampq
 | 
					from magic_ampq import magic_ampq
 | 
				
			||||||
 | 
					from error_handlers import register_all_error_handlers
 | 
				
			||||||
 | 
					
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
Main Flask RESTful API
 | 
					Main Flask RESTful API
 | 
				
			||||||
@@ -72,5 +73,7 @@ api.add_resource(SampleParameterResource, '/sample/<tag>')
 | 
				
			|||||||
health.add_check(health_database_status)
 | 
					health.add_check(health_database_status)
 | 
				
			||||||
health.add_check(ampq_connection_status)
 | 
					health.add_check(ampq_connection_status)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					register_all_error_handlers(app)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run())
 | 
					app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										18
									
								
								src/error_handlers.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										18
									
								
								src/error_handlers.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,18 @@
 | 
				
			|||||||
 | 
					#!/usr/bin/env python3
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_standard_error_handler(code: int):
 | 
				
			||||||
 | 
					    def error_handler(err):
 | 
				
			||||||
 | 
					        return {"msg": str(err)}, code
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return error_handler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# function to register all handlers
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def register_all_error_handlers(app):
 | 
				
			||||||
 | 
					    error_codes_to_override = [404, 403, 401, 405, 400, 409, 422, 500]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    for code in error_codes_to_override:
 | 
				
			||||||
 | 
					        app.register_error_handler(code, get_standard_error_handler(code))
 | 
				
			||||||
@@ -50,19 +50,33 @@ class MagicAMPQ:
 | 
				
			|||||||
        """
 | 
					        """
 | 
				
			||||||
        This method should be called periodically to keep up the connection
 | 
					        This method should be called periodically to keep up the connection
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
 | 
					        lock_start = time.time()
 | 
				
			||||||
        with self._lock:
 | 
					        with self._lock:
 | 
				
			||||||
 | 
					            lock_acquire_time = time.time() - lock_start
 | 
				
			||||||
 | 
					            if lock_acquire_time >= 1.5:
 | 
				
			||||||
 | 
					                self.app.logger.warning(f"Loop: Lock acquire took {lock_acquire_time:5f} sec")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                self._pika_connection.process_data_events(0)
 | 
					                self._pika_connection.process_data_events(0)
 | 
				
			||||||
                # We won't attempt retry if this fail
 | 
					                # We won't attempt retry if this fail
 | 
				
			||||||
            except pika.exceptions.AMQPConnectionError:
 | 
					            except pika.exceptions.AMQPConnectionError as e:
 | 
				
			||||||
 | 
					                self.app.logger.warning(f"Connection error during process loop: {e} (attempting reconnect)")
 | 
				
			||||||
                self._reconnect_ampq()
 | 
					                self._reconnect_ampq()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            total_time = time.time() - lock_start
 | 
				
			||||||
 | 
					            if total_time > 2:
 | 
				
			||||||
 | 
					                self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def publish(self, payload=None):
 | 
					    def publish(self, payload=None):
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        Publish a simple json serialized message to the configured queue.
 | 
					        Publish a simple json serialized message to the configured queue.
 | 
				
			||||||
        If the connection is broken, then this call will block until the connection is restored
 | 
					        If the connection is broken, then this call will block until the connection is restored
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
 | 
					        lock_start = time.time()
 | 
				
			||||||
        with self._lock:
 | 
					        with self._lock:
 | 
				
			||||||
 | 
					            lock_acquire_time = time.time() - lock_start
 | 
				
			||||||
 | 
					            if lock_acquire_time >= 0.3:
 | 
				
			||||||
 | 
					                self.app.logger.warning(f"Publish: Lock acquire took {lock_acquire_time:5f} sec")
 | 
				
			||||||
            tries = 0
 | 
					            tries = 0
 | 
				
			||||||
            while True:
 | 
					            while True:
 | 
				
			||||||
                try:
 | 
					                try:
 | 
				
			||||||
@@ -72,7 +86,8 @@ class MagicAMPQ:
 | 
				
			|||||||
                        body=json.dumps(payload).encode('UTF-8')
 | 
					                        body=json.dumps(payload).encode('UTF-8')
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
                    break  # message sent successfully
 | 
					                    break  # message sent successfully
 | 
				
			||||||
                except pika.exceptions.AMQPConnectionError:
 | 
					                except pika.exceptions.AMQPConnectionError as e:
 | 
				
			||||||
 | 
					                    self.app.logger.warning(f"Connection error during publish: {e} (attempting reconnect)")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    if tries > 30:
 | 
					                    if tries > 30:
 | 
				
			||||||
                        raise  # just give up
 | 
					                        raise  # just give up
 | 
				
			||||||
@@ -81,7 +96,8 @@ class MagicAMPQ:
 | 
				
			|||||||
                        try:
 | 
					                        try:
 | 
				
			||||||
                            self._reconnect_ampq()
 | 
					                            self._reconnect_ampq()
 | 
				
			||||||
                            break
 | 
					                            break
 | 
				
			||||||
                        except pika.exceptions.AMQPConnectionError:
 | 
					                        except pika.exceptions.AMQPConnectionError as e:
 | 
				
			||||||
 | 
					                            self.app.logger.warning(f"Connection error during reconnection: {e} (attempting reconnect)")
 | 
				
			||||||
                            tries += 1
 | 
					                            tries += 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            if tries > 30:
 | 
					                            if tries > 30:
 | 
				
			||||||
@@ -89,6 +105,9 @@ class MagicAMPQ:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
                        if tries > 10:
 | 
					                        if tries > 10:
 | 
				
			||||||
                            time.sleep(2)
 | 
					                            time.sleep(2)
 | 
				
			||||||
 | 
					            total_time = time.time() - lock_start
 | 
				
			||||||
 | 
					            if total_time > 0.5:
 | 
				
			||||||
 | 
					                self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def is_healthy(self) -> bool:
 | 
					    def is_healthy(self) -> bool:
 | 
				
			||||||
        with self._lock:
 | 
					        with self._lock:
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,5 +1,6 @@
 | 
				
			|||||||
#!/usr/bin/env python3
 | 
					#!/usr/bin/env python3
 | 
				
			||||||
import json
 | 
					import json
 | 
				
			||||||
 | 
					import time
 | 
				
			||||||
from datetime import datetime
 | 
					from datetime import datetime
 | 
				
			||||||
import tzlocal
 | 
					import tzlocal
 | 
				
			||||||
from xeger import Xeger
 | 
					from xeger import Xeger
 | 
				
			||||||
@@ -92,10 +93,15 @@ class SampleResource(Resource):
 | 
				
			|||||||
                soundfile.content_type,
 | 
					                soundfile.content_type,
 | 
				
			||||||
                {'Content-Length': soundfile_content_length})}
 | 
					                {'Content-Length': soundfile_content_length})}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        upload_started = time.time()
 | 
				
			||||||
        r = requests.post(
 | 
					        r = requests.post(
 | 
				
			||||||
            f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object",
 | 
					            f"http://{current_app.config.get('STORAGE_HOSTNAME')}/object",
 | 
				
			||||||
            files=files
 | 
					            files=files
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					        upload_time = time.time() - upload_started
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if upload_time > 0.9:
 | 
				
			||||||
 | 
					            current_app.logger.warning(f"Uploading to storage-service took {upload_time:5} sec")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if r.status_code not in [200, 201]:
 | 
					        if r.status_code not in [200, 201]:
 | 
				
			||||||
            return abort(500, f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}")
 | 
					            return abort(500, f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}")
 | 
				
			||||||
@@ -104,7 +110,7 @@ class SampleResource(Resource):
 | 
				
			|||||||
            magic_ampq.publish({'tag': generated_tag})
 | 
					            magic_ampq.publish({'tag': generated_tag})
 | 
				
			||||||
        except Exception as e:
 | 
					        except Exception as e:
 | 
				
			||||||
            current_app.logger.exception(e)
 | 
					            current_app.logger.exception(e)
 | 
				
			||||||
            return abort(569, "AMPQ Publish error")
 | 
					            return abort(500, f"AMPQ Publish error: {str(e)}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if current_app.config['ENABLE_INFLUXDB']:
 | 
					        if current_app.config['ENABLE_INFLUXDB']:
 | 
				
			||||||
            influx_db.write_points(
 | 
					            influx_db.write_points(
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user