connect on demand
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Torma Kristóf 2020-10-19 23:26:59 +02:00
parent 483c97e980
commit dadb6508b3
Signed by: tormakris
GPG Key ID: DC83C4F2C41B1047
4 changed files with 15 additions and 301 deletions

View File

@ -10,7 +10,6 @@ from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from config import * from config import *
from db import db from db import db
from marshm import ma from marshm import ma
from fpika import fpika
from resources import SampleResource, SampleParameterResource from resources import SampleResource, SampleParameterResource
""" """
@ -42,12 +41,9 @@ app.config['FLASK_PIKA_PARAMS'] = {'host': RABBITMQ_HOST,
'password': RABBITMQ_PASSWORD, 'password': RABBITMQ_PASSWORD,
'port': 5672, 'port': 5672,
'virtual_host': '/'} 'virtual_host': '/'}
app.config['FLASK_PIKA_POOL_PARAMS'] = {'pool_size': 4,
'pool_recycle': 10}
api = Api(app) api = Api(app)
db.init_app(app) db.init_app(app)
ma.init_app(app) ma.init_app(app)
fpika.init_app(app)
with app.app_context(): with app.app_context():
db.create_all() db.create_all()
@ -67,16 +63,6 @@ api.add_resource(SampleResource, "/sample")
api.add_resource(SampleParameterResource, '/sample/<tag>') api.add_resource(SampleParameterResource, '/sample/<tag>')
@app.before_first_request
def before_first_request():
ch = fpika.channel()
ch.exchange_declare(exchange=RABBITMQ_EXCHANGE,
exchange_type='fanout',
durable=False,
auto_delete=False)
fpika.return_channel(ch)
if __name__ == "__main__": if __name__ == "__main__":
app.run( app.run(
debug=bool(DEBUG), debug=bool(DEBUG),

View File

@ -1,14 +0,0 @@
#!/usr/bin/env python3
"""
Flask-Pika API
"""
__author__ = '@tormakris'
__copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "fpika"
__version__text__ = "1"
from fpika_fork import Pika as Fpika
fpika = Fpika()

View File

@ -1,268 +0,0 @@
import datetime
import pika
import warnings
from pika import connection
# python-3 compatibility
try:
from Queue import Queue
except ImportError as e:
from queue import Queue
try:
xrange
except NameError as e:
xrange = range
__all__ = ['Pika']
class Pika(object):
def __init__(self, app=None):
"""
Create the Flask Pika extension.
"""
self.app = app
if app is not None:
self.init_app(app)
def init_app(self, app):
"""
Initialize the Flask Pika extension
"""
pika_params = app.config['FLASK_PIKA_PARAMS']
pool_params = app.config.get('FLASK_PIKA_POOL_PARAMS', None)
self.debug = app.debug
self.logger = app.logger
self.pool_size = 1
self.pool_recycle = -1
self.pool_queue = Queue()
self.channel_recycle_times = {}
# fix create credentials if needed
if isinstance(pika_params, connection.Parameters):
self._pika_connection_params = pika_params
else:
if 'credentials' not in pika_params:
pika_params['credentials'] = pika.PlainCredentials(pika_params['username'], pika_params['password'])
del pika_params['username']
del pika_params['password']
self._pika_connection_params = pika.ConnectionParameters(**pika_params)
self.__DEBUG("Connection params are %s" % self._pika_connection_params)
# setup pooling if requested
if pool_params is not None:
self.pool_size = pool_params['pool_size']
self.pool_recycle = pool_params['pool_recycle']
for i in xrange(self.pool_size):
channel = PrePopulationChannel()
self.__set_recycle_for_channel(channel, -1)
self.pool_queue.put(channel)
self.__DEBUG("Pool params are %s" % pool_params)
def __create_channel(self):
"""
Create a connection and a channel based on pika params
"""
pika_connection = pika.BlockingConnection(self._pika_connection_params)
channel = pika_connection.channel()
self.__DEBUG("Created AMQP Connection and Channel %s" % channel)
# add support context manager
def close():
self.return_channel(channel)
ch = ProxyContextManager(instance=channel, close_callback=close)
self.__set_recycle_for_channel(ch)
return ch
def __destroy_channel(self, channel):
"""
Destroy a channel by closing it's underlying connection
"""
self.__remove_recycle_time_for_channel(channel)
try:
channel.connection.close()
self.__DEBUG("Destroyed AMQP Connection and Channel %s" % channel)
except Exception as e:
self.__WARN("Failed to destroy channel cleanly %s" % e)
def __set_recycle_for_channel(self, channel, recycle_time=None):
"""
Set the next recycle time for a channel
"""
if recycle_time is None:
recycle_time = (unix_time_millis_now() + (self.pool_recycle * 1000))
self.channel_recycle_times[hash(channel)] = recycle_time
def __remove_recycle_time_for_channel(self, channel):
"""
Remove the recycle time for a given channel if it exists
"""
channel_hash = hash(channel)
if channel_hash in self.channel_recycle_times:
del self.channel_recycle_times[channel_hash]
def __should_recycle_channel(self, channel):
"""
Determine if a channel should be recycled based on it's recycle time
"""
recycle_time = self.channel_recycle_times[hash(channel)]
return recycle_time < unix_time_millis_now()
def channel(self):
"""
Get a channel
If pooling is setup, this will block until a channel is available
If pooling is not setup, a new channel will be created
"""
# if using pooling
if self.pool_recycle > -1:
# get channel from pool or block until channel is available
ch = self.pool_queue.get()
self.__DEBUG("Got Pika channel from pool %s" % ch)
# recycle channel if needed or extend recycle time
if self.__should_recycle_channel(ch):
old_channel = ch
self.__destroy_channel(ch)
ch = self.__create_channel()
self.__DEBUG(
"Pika channel is too old, recycling channel %s and replacing it with %s" % (old_channel, ch))
else:
self.__set_recycle_for_channel(ch)
# make sure our channel is still open
while ch is None or not ch.is_open:
old_channel = ch
self.__destroy_channel(ch)
ch = self.__create_channel()
self.__WARN("Pika channel not open, replacing channel %s with %s" % (old_channel, ch))
# if not using pooling
else:
# create a new channel
ch = self.__create_channel()
return ch
def return_channel(self, channel):
"""
Return a channel
If pooling is setup, will return the channel to the channel pool
**unless** the channel is closed, then channel is passed to return_broken_channel
If pooling is not setup, will destroy the channel
"""
# if using pooling
if self.pool_recycle > -1:
self.__DEBUG("Returning Pika channel to pool %s" % channel)
if channel.is_open:
self.pool_queue.put(channel)
else:
self.return_broken_channel(channel)
# if not using pooling then just destroy the channel
else:
self.__destroy_channel(channel)
def return_broken_channel(self, channel):
"""
Return a broken channel
If pooling is setup, will destroy the broken channel and replace it in the channel pool with a new channel
If pooling is not setup, will destroy the channel
"""
# if using pooling
if self.pool_recycle > -1:
self.__WARN("Pika channel returned in broken state, replacing %s" % channel)
self.__destroy_channel(channel)
self.pool_queue.put(self.__create_channel())
# if not using pooling then just destroy the channel
else:
self.__WARN("Pika channel returned in broken state %s" % channel)
self.__destroy_channel(channel)
def __DEBUG(self, msg):
"""
Log a message at debug level if app in debug mode
"""
if self.debug:
self.logger.debug(msg)
def __WARN(self, msg):
"""
Log a message at warning level
"""
self.logger.warn(msg)
class PrePopulationChannel(object):
def __init__(self):
self._connection = PrePopulationConnection()
@property
def connection(self):
return self._connection
class PrePopulationConnection(object):
def __init__(self):
pass
def close(self):
pass
def unix_time(dt):
"""
Return unix time in microseconds
"""
epoch = datetime.datetime.utcfromtimestamp(0)
delta = dt - epoch
return int((delta.microseconds + (delta.seconds + delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6)
def unix_time_millis(dt):
"""
Return unix time in milliseconds
"""
return round(unix_time(dt) * 1000.0)
def unix_time_millis_now():
"""
Return current unix time in milliseconds
"""
return unix_time_millis(datetime.datetime.utcnow())
class ProxyContextManager(object):
"""
working as proxy object or as context manager for object
"""
def __init__(self, instance, close_callback=None):
self.instance = instance
self.close_callback = close_callback
def __getattr__(self, key):
try:
return object.__getattribute__(self, key)
except AttributeError:
return getattr(self.instance, key)
def __enter__(self):
return self.instance
def __exit__(self, exc_type, exc_value, exc_traceback):
if self.close_callback:
self.close_callback()
else:
self.instance.close()

View File

@ -8,7 +8,7 @@ from db import db
from models import SampleMetadata from models import SampleMetadata
from schemas import SampleSchema, SampleMetadataSchema from schemas import SampleSchema, SampleMetadataSchema
from config import * from config import *
from fpika import fpika import pika
""" """
Flask Restful endpoints Flask Restful endpoints
@ -76,10 +76,20 @@ class SampleResource(Resource):
soundfile, soundfile,
soundfile.content_type, soundfile.content_type,
{'Content-Length': soundfile.content_length})}).raise_for_status() {'Content-Length': soundfile.content_length})}).raise_for_status()
ch = fpika.channel() credentials = pika.PlainCredentials(current_app.config['FLASK_PIKA_PARAMS']['username'],
ch.basic_publish(exchange=RABBITMQ_EXCHANGE, routing_key='feature', current_app.config['FLASK_PIKA_PARAMS']['password'])
body=json.dumps({'tag': generated_tag}).encode('UTF-8')) connection = pika.BlockingConnection(
fpika.return_channel(ch) pika.ConnectionParameters(host=current_app.config['FLASK_PIKA_PARAMS']['host'],
credentials=credentials,
heartbeat=0,
socket_timeout=5))
channel = connection.channel()
channel.exchange_declare(exchange=current_app.config['FLASK_PIKA_PARAMS']['EXCHANGE_NAME'],
exchange_type='fanout')
channel.basic_publish(exchange=current_app.config['FLASK_PIKA_PARAMS']['EXCHANGE_NAME'],
routing_key='feature',
body=json.dumps({'tag': generated_tag}).encode('UTF-8'))
connection.close()
except Exception as e: except Exception as e:
current_app.logger.exception(e) current_app.logger.exception(e)
db.session.rollback() db.session.rollback()