From 099d6adffa4e3b883d69cae09d93ac13359efbef Mon Sep 17 00:00:00 2001 From: Torma Date: Sat, 3 Oct 2020 12:31:19 +0200 Subject: [PATCH] remove cancer --- requirements.txt | 1 - src/app.py | 6 +- src/fpika.py | 2 +- src/fpika_fork.py | 268 ++++++++++++++++++++++++++++++++++++++++++++++ src/resources.py | 1 - src/schemas.py | 2 +- 6 files changed, 274 insertions(+), 6 deletions(-) create mode 100644 src/fpika_fork.py diff --git a/requirements.txt b/requirements.txt index 7ce7b88..9448263 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,4 +12,3 @@ psycopg2-binary marshmallow marshmallow-sqlalchemy flask-marshmallow -flask-pika diff --git a/src/app.py b/src/app.py index 4cca4ed..ca4b3c4 100644 --- a/src/app.py +++ b/src/app.py @@ -4,6 +4,8 @@ from flask import Flask from flask_restful import Api import sentry_sdk from sentry_sdk.integrations.flask import FlaskIntegration +from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration + from config import * from db import db @@ -12,7 +14,7 @@ from fpika import fpika from resources import SampleResource, SampleParameterResource """ -Main Flask RESTful API +Main Flask RESTful APIm """ __author__ = "@tormakris" @@ -23,7 +25,7 @@ __version__text__ = "1" if SENTRY_DSN: sentry_sdk.init( dsn=SENTRY_DSN, - integrations=[FlaskIntegration()], + integrations=[FlaskIntegration(), SqlalchemyIntegration()], traces_sample_rate=1.0, send_default_pii=True, release=RELEASE_ID, diff --git a/src/fpika.py b/src/fpika.py index 23b7175..fce69d5 100644 --- a/src/fpika.py +++ b/src/fpika.py @@ -9,6 +9,6 @@ __copyright__ = "Copyright 2020, Birbnetes Team" __module_name__ = "fpika" __version__text__ = "1" -from flask_pika import Pika as Fpika +from fpika_fork import Pika as Fpika fpika = Fpika() diff --git a/src/fpika_fork.py b/src/fpika_fork.py new file mode 100644 index 0000000..51cedaa --- /dev/null +++ b/src/fpika_fork.py @@ -0,0 +1,268 @@ +import datetime +import pika +import warnings +from pika import connection + +# python-3 compatibility +try: + from Queue import Queue +except ImportError as e: + from queue import Queue + +try: + xrange +except NameError as e: + xrange = range + +__all__ = ['Pika'] + + +class Pika(object): + + def __init__(self, app=None): + """ + Create the Flask Pika extension. + """ + self.app = app + if app is not None: + self.init_app(app) + + def init_app(self, app): + """ + Initialize the Flask Pika extension + """ + pika_params = app.config['FLASK_PIKA_PARAMS'] + pool_params = app.config.get('FLASK_PIKA_POOL_PARAMS', None) + + self.debug = app.debug + self.logger = app.logger + self.pool_size = 1 + self.pool_recycle = -1 + self.pool_queue = Queue() + self.channel_recycle_times = {} + + # fix create credentials if needed + if isinstance(pika_params, connection.Parameters): + self._pika_connection_params = pika_params + else: + if 'credentials' not in pika_params: + pika_params['credentials'] = pika.PlainCredentials(pika_params['username'], pika_params['password']) + del pika_params['username'] + del pika_params['password'] + self._pika_connection_params = pika.ConnectionParameters(**pika_params) + + self.__DEBUG("Connection params are %s" % self._pika_connection_params) + + # setup pooling if requested + if pool_params is not None: + self.pool_size = pool_params['pool_size'] + self.pool_recycle = pool_params['pool_recycle'] + for i in xrange(self.pool_size): + channel = PrePopulationChannel() + self.__set_recycle_for_channel(channel, -1) + self.pool_queue.put(channel) + self.__DEBUG("Pool params are %s" % pool_params) + + def __create_channel(self): + """ + Create a connection and a channel based on pika params + """ + pika_connection = pika.BlockingConnection(self._pika_connection_params) + channel = pika_connection.channel() + self.__DEBUG("Created AMQP Connection and Channel %s" % channel) + + # add support context manager + def close(): + self.return_channel(channel) + + ch = ProxyContextManager(instance=channel, close_callback=close) + + self.__set_recycle_for_channel(ch) + return ch + + def __destroy_channel(self, channel): + """ + Destroy a channel by closing it's underlying connection + """ + self.__remove_recycle_time_for_channel(channel) + try: + channel.connection.close() + self.__DEBUG("Destroyed AMQP Connection and Channel %s" % channel) + except Exception as e: + self.__WARN("Failed to destroy channel cleanly %s" % e) + + def __set_recycle_for_channel(self, channel, recycle_time=None): + """ + Set the next recycle time for a channel + """ + if recycle_time is None: + recycle_time = (unix_time_millis_now() + (self.pool_recycle * 1000)) + + self.channel_recycle_times[hash(channel)] = recycle_time + + def __remove_recycle_time_for_channel(self, channel): + """ + Remove the recycle time for a given channel if it exists + """ + channel_hash = hash(channel) + if channel_hash in self.channel_recycle_times: + del self.channel_recycle_times[channel_hash] + + def __should_recycle_channel(self, channel): + """ + Determine if a channel should be recycled based on it's recycle time + """ + recycle_time = self.channel_recycle_times[hash(channel)] + return recycle_time < unix_time_millis_now() + + def channel(self): + """ + Get a channel + If pooling is setup, this will block until a channel is available + If pooling is not setup, a new channel will be created + """ + # if using pooling + if self.pool_recycle > -1: + # get channel from pool or block until channel is available + ch = self.pool_queue.get() + self.__DEBUG("Got Pika channel from pool %s" % ch) + + # recycle channel if needed or extend recycle time + if self.__should_recycle_channel(ch): + old_channel = ch + self.__destroy_channel(ch) + ch = self.__create_channel() + self.__DEBUG( + "Pika channel is too old, recycling channel %s and replacing it with %s" % (old_channel, ch)) + else: + self.__set_recycle_for_channel(ch) + + # make sure our channel is still open + while ch is None or not ch.is_open: + old_channel = ch + self.__destroy_channel(ch) + ch = self.__create_channel() + self.__WARN("Pika channel not open, replacing channel %s with %s" % (old_channel, ch)) + + # if not using pooling + else: + # create a new channel + ch = self.__create_channel() + + return ch + + def return_channel(self, channel): + """ + Return a channel + If pooling is setup, will return the channel to the channel pool + **unless** the channel is closed, then channel is passed to return_broken_channel + If pooling is not setup, will destroy the channel + """ + # if using pooling + if self.pool_recycle > -1: + self.__DEBUG("Returning Pika channel to pool %s" % channel) + if channel.is_open: + self.pool_queue.put(channel) + else: + self.return_broken_channel(channel) + + # if not using pooling then just destroy the channel + else: + self.__destroy_channel(channel) + + def return_broken_channel(self, channel): + """ + Return a broken channel + If pooling is setup, will destroy the broken channel and replace it in the channel pool with a new channel + If pooling is not setup, will destroy the channel + """ + # if using pooling + if self.pool_recycle > -1: + self.__WARN("Pika channel returned in broken state, replacing %s" % channel) + self.__destroy_channel(channel) + self.pool_queue.put(self.__create_channel()) + + # if not using pooling then just destroy the channel + else: + self.__WARN("Pika channel returned in broken state %s" % channel) + self.__destroy_channel(channel) + + def __DEBUG(self, msg): + """ + Log a message at debug level if app in debug mode + """ + if self.debug: + self.logger.debug(msg) + + def __WARN(self, msg): + """ + Log a message at warning level + """ + self.logger.warn(msg) + + +class PrePopulationChannel(object): + + def __init__(self): + self._connection = PrePopulationConnection() + + @property + def connection(self): + return self._connection + + +class PrePopulationConnection(object): + + def __init__(self): + pass + + def close(self): + pass + + +def unix_time(dt): + """ + Return unix time in microseconds + """ + epoch = datetime.datetime.utcfromtimestamp(0) + delta = dt - epoch + return int((delta.microseconds + (delta.seconds + delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6) + + +def unix_time_millis(dt): + """ + Return unix time in milliseconds + """ + return round(unix_time(dt) * 1000.0) + + +def unix_time_millis_now(): + """ + Return current unix time in milliseconds + """ + return unix_time_millis(datetime.datetime.utcnow()) + + +class ProxyContextManager(object): + """ + working as proxy object or as context manager for object + """ + + def __init__(self, instance, close_callback=None): + self.instance = instance + self.close_callback = close_callback + + def __getattr__(self, key): + try: + return object.__getattribute__(self, key) + except AttributeError: + return getattr(self.instance, key) + + def __enter__(self): + return self.instance + + def __exit__(self, exc_type, exc_value, exc_traceback): + if self.close_callback: + self.close_callback() + else: + self.instance.close() diff --git a/src/resources.py b/src/resources.py index 62b6afb..8ce0fe4 100644 --- a/src/resources.py +++ b/src/resources.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 -import logging import json from xeger import Xeger from flask_restful import Resource diff --git a/src/schemas.py b/src/schemas.py index 5e0ca9e..459a957 100644 --- a/src/schemas.py +++ b/src/schemas.py @@ -32,4 +32,4 @@ class SampleMetadataSchema(ma.SQLAlchemyAutoSchema): """ class Meta: model = SampleMetadata - + exclude = ('timestamp', 'id',)