Changed the order of things
	
		
			
	
		
	
	
		
	
		
			All checks were successful
		
		
	
	
		
			
				
	
				continuous-integration/drone/push Build is passing
				
			
		
		
	
	
				
					
				
			
		
			All checks were successful
		
		
	
	continuous-integration/drone/push Build is passing
				
			also fixed spelling of amqp
This commit is contained in:
		
							
								
								
									
										10
									
								
								src/app.py
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								src/app.py
									
									
									
									
									
								
							@@ -11,12 +11,12 @@ from db import db
 | 
			
		||||
from marshm import ma
 | 
			
		||||
from influxus import influx_db
 | 
			
		||||
from resources import SampleResource, SampleParameterResource
 | 
			
		||||
from healthchecks import health_database_status, ampq_connection_status
 | 
			
		||||
from healthchecks import health_database_status, amqp_connection_status
 | 
			
		||||
 | 
			
		||||
import atexit
 | 
			
		||||
 | 
			
		||||
from apscheduler.schedulers.background import BackgroundScheduler
 | 
			
		||||
from magic_ampq import magic_ampq
 | 
			
		||||
from magic_amqp import magic_amqp
 | 
			
		||||
from error_handlers import register_all_error_handlers
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
@@ -48,10 +48,10 @@ db.init_app(app)
 | 
			
		||||
ma.init_app(app)
 | 
			
		||||
 | 
			
		||||
# ampq magic stuff
 | 
			
		||||
magic_ampq.init_app(app)
 | 
			
		||||
magic_amqp.init_app(app)
 | 
			
		||||
 | 
			
		||||
ampq_loop_scheduler = BackgroundScheduler()
 | 
			
		||||
ampq_loop_scheduler.add_job(func=lambda: magic_ampq.loop(), trigger="interval", seconds=5)
 | 
			
		||||
ampq_loop_scheduler.add_job(func=lambda: magic_amqp.loop(), trigger="interval", seconds=5)
 | 
			
		||||
atexit.register(lambda: ampq_loop_scheduler.shutdown())
 | 
			
		||||
 | 
			
		||||
ampq_loop_scheduler.start()
 | 
			
		||||
@@ -71,7 +71,7 @@ api.add_resource(SampleResource, "/sample")
 | 
			
		||||
api.add_resource(SampleParameterResource, '/sample/<tag>')
 | 
			
		||||
 | 
			
		||||
health.add_check(health_database_status)
 | 
			
		||||
health.add_check(ampq_connection_status)
 | 
			
		||||
health.add_check(amqp_connection_status)
 | 
			
		||||
 | 
			
		||||
register_all_error_handlers(app)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,7 +1,7 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
from db import db
 | 
			
		||||
from magic_ampq import magic_ampq
 | 
			
		||||
from magic_amqp import magic_amqp
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
Healthchek functions
 | 
			
		||||
@@ -24,12 +24,12 @@ def health_database_status():
 | 
			
		||||
    return is_database_working, output
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def ampq_connection_status():
 | 
			
		||||
    if magic_ampq.is_healthy():
 | 
			
		||||
def amqp_connection_status():
 | 
			
		||||
    if magic_amqp.is_healthy():
 | 
			
		||||
        result = True
 | 
			
		||||
        text = "ampq connection is ok"
 | 
			
		||||
        text = "amqp connection is ok"
 | 
			
		||||
    else:
 | 
			
		||||
        result = False
 | 
			
		||||
        text = "ampq connection is unhealthy"
 | 
			
		||||
        text = "amqp connection is unhealthy"
 | 
			
		||||
 | 
			
		||||
    return result, text
 | 
			
		||||
 
 | 
			
		||||
@@ -6,7 +6,7 @@ import json
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MagicAMPQ:
 | 
			
		||||
class MagicAMQP:
 | 
			
		||||
    """
 | 
			
		||||
    This is my pathetic attempt to make RabbitMQ connection in a Flask app reliable and performant.
 | 
			
		||||
    """
 | 
			
		||||
@@ -118,4 +118,4 @@ class MagicAMPQ:
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# instance to be used in the flask app
 | 
			
		||||
magic_ampq = MagicAMPQ()
 | 
			
		||||
magic_amqp = MagicAMQP()
 | 
			
		||||
@@ -7,7 +7,7 @@ from xeger import Xeger
 | 
			
		||||
from flask_restful import Resource
 | 
			
		||||
from flask import request, current_app, abort
 | 
			
		||||
import requests
 | 
			
		||||
from magic_ampq import magic_ampq
 | 
			
		||||
from magic_amqp import magic_amqp
 | 
			
		||||
from db import db
 | 
			
		||||
from influxus import influx_db
 | 
			
		||||
from models import SampleMetadata
 | 
			
		||||
@@ -106,12 +106,16 @@ class SampleResource(Resource):
 | 
			
		||||
        if r.status_code not in [200, 201]:
 | 
			
		||||
            return abort(500, f"Failed to upload sample to storage service. Upstream status: {r.status_code}: {r.text}")
 | 
			
		||||
 | 
			
		||||
        db.session.commit()
 | 
			
		||||
 | 
			
		||||
        # Announce only after the data is successfully committed
 | 
			
		||||
        try:
 | 
			
		||||
            magic_ampq.publish({'tag': generated_tag})
 | 
			
		||||
            magic_amqp.publish({'tag': generated_tag})
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            current_app.logger.exception(e)
 | 
			
		||||
            return abort(500, f"AMPQ Publish error: {str(e)}")
 | 
			
		||||
            return abort(500, f"AMQP Publish error: {str(e)}")
 | 
			
		||||
 | 
			
		||||
        # metrics
 | 
			
		||||
        if current_app.config['ENABLE_INFLUXDB']:
 | 
			
		||||
            influx_db.write_points(
 | 
			
		||||
                [
 | 
			
		||||
@@ -128,10 +132,8 @@ class SampleResource(Resource):
 | 
			
		||||
                ]
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        db.session.commit()
 | 
			
		||||
        return {"tag": generated_tag}, 200
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def get(self):
 | 
			
		||||
        """
 | 
			
		||||
        Get all stored items
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user