remove cancer
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Torma Kristóf 2020-10-03 12:31:19 +02:00
parent ccfd09f239
commit 099d6adffa
6 changed files with 274 additions and 6 deletions

View File

@ -12,4 +12,3 @@ psycopg2-binary
marshmallow
marshmallow-sqlalchemy
flask-marshmallow
flask-pika

View File

@ -4,6 +4,8 @@ from flask import Flask
from flask_restful import Api
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from config import *
from db import db
@ -12,7 +14,7 @@ from fpika import fpika
from resources import SampleResource, SampleParameterResource
"""
Main Flask RESTful API
Main Flask RESTful APIm
"""
__author__ = "@tormakris"
@ -23,7 +25,7 @@ __version__text__ = "1"
if SENTRY_DSN:
sentry_sdk.init(
dsn=SENTRY_DSN,
integrations=[FlaskIntegration()],
integrations=[FlaskIntegration(), SqlalchemyIntegration()],
traces_sample_rate=1.0,
send_default_pii=True,
release=RELEASE_ID,

View File

@ -9,6 +9,6 @@ __copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "fpika"
__version__text__ = "1"
from flask_pika import Pika as Fpika
from fpika_fork import Pika as Fpika
fpika = Fpika()

268
src/fpika_fork.py Normal file
View File

@ -0,0 +1,268 @@
import datetime
import pika
import warnings
from pika import connection
# python-3 compatibility
try:
from Queue import Queue
except ImportError as e:
from queue import Queue
try:
xrange
except NameError as e:
xrange = range
__all__ = ['Pika']
class Pika(object):
def __init__(self, app=None):
"""
Create the Flask Pika extension.
"""
self.app = app
if app is not None:
self.init_app(app)
def init_app(self, app):
"""
Initialize the Flask Pika extension
"""
pika_params = app.config['FLASK_PIKA_PARAMS']
pool_params = app.config.get('FLASK_PIKA_POOL_PARAMS', None)
self.debug = app.debug
self.logger = app.logger
self.pool_size = 1
self.pool_recycle = -1
self.pool_queue = Queue()
self.channel_recycle_times = {}
# fix create credentials if needed
if isinstance(pika_params, connection.Parameters):
self._pika_connection_params = pika_params
else:
if 'credentials' not in pika_params:
pika_params['credentials'] = pika.PlainCredentials(pika_params['username'], pika_params['password'])
del pika_params['username']
del pika_params['password']
self._pika_connection_params = pika.ConnectionParameters(**pika_params)
self.__DEBUG("Connection params are %s" % self._pika_connection_params)
# setup pooling if requested
if pool_params is not None:
self.pool_size = pool_params['pool_size']
self.pool_recycle = pool_params['pool_recycle']
for i in xrange(self.pool_size):
channel = PrePopulationChannel()
self.__set_recycle_for_channel(channel, -1)
self.pool_queue.put(channel)
self.__DEBUG("Pool params are %s" % pool_params)
def __create_channel(self):
"""
Create a connection and a channel based on pika params
"""
pika_connection = pika.BlockingConnection(self._pika_connection_params)
channel = pika_connection.channel()
self.__DEBUG("Created AMQP Connection and Channel %s" % channel)
# add support context manager
def close():
self.return_channel(channel)
ch = ProxyContextManager(instance=channel, close_callback=close)
self.__set_recycle_for_channel(ch)
return ch
def __destroy_channel(self, channel):
"""
Destroy a channel by closing it's underlying connection
"""
self.__remove_recycle_time_for_channel(channel)
try:
channel.connection.close()
self.__DEBUG("Destroyed AMQP Connection and Channel %s" % channel)
except Exception as e:
self.__WARN("Failed to destroy channel cleanly %s" % e)
def __set_recycle_for_channel(self, channel, recycle_time=None):
"""
Set the next recycle time for a channel
"""
if recycle_time is None:
recycle_time = (unix_time_millis_now() + (self.pool_recycle * 1000))
self.channel_recycle_times[hash(channel)] = recycle_time
def __remove_recycle_time_for_channel(self, channel):
"""
Remove the recycle time for a given channel if it exists
"""
channel_hash = hash(channel)
if channel_hash in self.channel_recycle_times:
del self.channel_recycle_times[channel_hash]
def __should_recycle_channel(self, channel):
"""
Determine if a channel should be recycled based on it's recycle time
"""
recycle_time = self.channel_recycle_times[hash(channel)]
return recycle_time < unix_time_millis_now()
def channel(self):
"""
Get a channel
If pooling is setup, this will block until a channel is available
If pooling is not setup, a new channel will be created
"""
# if using pooling
if self.pool_recycle > -1:
# get channel from pool or block until channel is available
ch = self.pool_queue.get()
self.__DEBUG("Got Pika channel from pool %s" % ch)
# recycle channel if needed or extend recycle time
if self.__should_recycle_channel(ch):
old_channel = ch
self.__destroy_channel(ch)
ch = self.__create_channel()
self.__DEBUG(
"Pika channel is too old, recycling channel %s and replacing it with %s" % (old_channel, ch))
else:
self.__set_recycle_for_channel(ch)
# make sure our channel is still open
while ch is None or not ch.is_open:
old_channel = ch
self.__destroy_channel(ch)
ch = self.__create_channel()
self.__WARN("Pika channel not open, replacing channel %s with %s" % (old_channel, ch))
# if not using pooling
else:
# create a new channel
ch = self.__create_channel()
return ch
def return_channel(self, channel):
"""
Return a channel
If pooling is setup, will return the channel to the channel pool
**unless** the channel is closed, then channel is passed to return_broken_channel
If pooling is not setup, will destroy the channel
"""
# if using pooling
if self.pool_recycle > -1:
self.__DEBUG("Returning Pika channel to pool %s" % channel)
if channel.is_open:
self.pool_queue.put(channel)
else:
self.return_broken_channel(channel)
# if not using pooling then just destroy the channel
else:
self.__destroy_channel(channel)
def return_broken_channel(self, channel):
"""
Return a broken channel
If pooling is setup, will destroy the broken channel and replace it in the channel pool with a new channel
If pooling is not setup, will destroy the channel
"""
# if using pooling
if self.pool_recycle > -1:
self.__WARN("Pika channel returned in broken state, replacing %s" % channel)
self.__destroy_channel(channel)
self.pool_queue.put(self.__create_channel())
# if not using pooling then just destroy the channel
else:
self.__WARN("Pika channel returned in broken state %s" % channel)
self.__destroy_channel(channel)
def __DEBUG(self, msg):
"""
Log a message at debug level if app in debug mode
"""
if self.debug:
self.logger.debug(msg)
def __WARN(self, msg):
"""
Log a message at warning level
"""
self.logger.warn(msg)
class PrePopulationChannel(object):
def __init__(self):
self._connection = PrePopulationConnection()
@property
def connection(self):
return self._connection
class PrePopulationConnection(object):
def __init__(self):
pass
def close(self):
pass
def unix_time(dt):
"""
Return unix time in microseconds
"""
epoch = datetime.datetime.utcfromtimestamp(0)
delta = dt - epoch
return int((delta.microseconds + (delta.seconds + delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6)
def unix_time_millis(dt):
"""
Return unix time in milliseconds
"""
return round(unix_time(dt) * 1000.0)
def unix_time_millis_now():
"""
Return current unix time in milliseconds
"""
return unix_time_millis(datetime.datetime.utcnow())
class ProxyContextManager(object):
"""
working as proxy object or as context manager for object
"""
def __init__(self, instance, close_callback=None):
self.instance = instance
self.close_callback = close_callback
def __getattr__(self, key):
try:
return object.__getattribute__(self, key)
except AttributeError:
return getattr(self.instance, key)
def __enter__(self):
return self.instance
def __exit__(self, exc_type, exc_value, exc_traceback):
if self.close_callback:
self.close_callback()
else:
self.instance.close()

View File

@ -1,5 +1,4 @@
#!/usr/bin/env python3
import logging
import json
from xeger import Xeger
from flask_restful import Resource

View File

@ -32,4 +32,4 @@ class SampleMetadataSchema(ma.SQLAlchemyAutoSchema):
"""
class Meta:
model = SampleMetadata
exclude = ('timestamp', 'id',)