Compare commits
	
		
			31 Commits
		
	
	
		
			influx
			...
			norbi-upda
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 947ac144b1 | |||
| 3cdacc6720 | |||
| 04bb2722ad | |||
| 2431812f09 | |||
| 10f57913f3 | |||
| 738eea1da3 | |||
| a118b79512 | |||
| 2c0e6ec7d7 | |||
| 7f987afa7a | |||
| ca548f0863 | |||
| cbaf2f2981 | |||
| 3d21e3a543 | |||
| 2030230258 | |||
| f90571afc3 | |||
| 459b3fa6df | |||
| 9c3f8c65fb | |||
| 00e9d02478 | |||
| 98234f0e8a | |||
| 4e3efb7295 | |||
| 9bfedf0090 | |||
| 3d423c71c6 | |||
| 414330b3d5 | |||
| 67c5d723ca | |||
| a844a13608 | |||
| ba69b9c2b1 | |||
| eb7f6498ab | |||
| 57b757cb41 | |||
| e64137ca56 | |||
| f15517af62 | |||
| 3c10a351ba | |||
| 30525ac967 | 
| @@ -16,4 +16,4 @@ COPY ./src . | ||||
|  | ||||
| EXPOSE 8080 | ||||
|  | ||||
| ENTRYPOINT ["gunicorn", "-b", "0.0.0.0:8080", "--workers", "1", "--threads", "1", "app:app"] | ||||
| ENTRYPOINT ["gunicorn", "-b", "0.0.0.0:8080", "app:app"] | ||||
|   | ||||
| @@ -6,16 +6,17 @@ metadata: | ||||
|     app: input-service | ||||
|   namespace: birbnetes | ||||
| data: | ||||
|   SENTRY_DSN: https://b181edf362e549f4967c6cd42629693d@sentry.kmlabz.com/3 | ||||
|   RELEASE_ID: luna-k8s | ||||
|   SENTRY_DSN: https://fce9e078e1494bf4b959b45d0b435386@sentry.kmlabz.com/2 | ||||
|   RELEASE_ID: kmlabz-k8s | ||||
|   INPUT_SERVICE_RELEASEMODE: release | ||||
|   INPUT_RABBITMQ_HOSTNAME: birb-rabbitmq | ||||
|   INPUT_RABBITMQ_EXCHANGE: "wave" | ||||
|   INPUT_RABBITMQ_QUEUE: wave-ready | ||||
|   INPUT_RABBITMQ_EXCHANGE_META: "sample-meta" | ||||
|   INPUT_RABBITMQ_EXCHANGE_CACHE: "sample-cache" | ||||
|   INPUT_RABBITMQ_USERNAME: user | ||||
|   INPUT_RABBITMQ_PASSWORD: 1wZVQnP5vy | ||||
|   INPUT_POSTGRES_HOSTNAME: input-postgres | ||||
|   INPUT_POSTGRES_USERNAME: input-service | ||||
|   INPUT_POSTGRES_PASSWORD: input-service-supersecret | ||||
|   INPUT_POSTGRES_DB: input-service | ||||
|   INPUT_STORAGE_HOSTNAME: storage-service | ||||
|   INPUT_RABBITMQ_PASSWORD: ZgCiSiSO8t | ||||
|   INFLUX_HOST: input-influx | ||||
|   INFLUX_PORT: "8086" | ||||
|   INFLUX_USERNAME: input-service | ||||
|   INFLUX_PASSWORD: input-service-supersecret | ||||
|   INFLUX_DB: input-service | ||||
|   CACHE_REDIS_URL: "redis://input-redis:6379/0" | ||||
|   | ||||
| @@ -19,12 +19,34 @@ spec: | ||||
|     spec: | ||||
|       containers: | ||||
|       - name: input-service | ||||
|         image: registry.kmlabz.com/birbnetesgit/input-service | ||||
|         image: registry.kmlabz.com/birbnetes/input-service | ||||
|         imagePullPolicy: Always | ||||
|         envFrom: | ||||
|           - configMapRef: | ||||
|               name: input-service | ||||
|         ports: | ||||
|         - containerPort: 8080 | ||||
|       - name: jaeger-agent | ||||
|         image: jaegertracing/jaeger-agent:latest | ||||
|         imagePullPolicy: IfNotPresent | ||||
|         ports: | ||||
|         - containerPort: 5775 | ||||
|           name: zk-compact-trft | ||||
|           protocol: UDP | ||||
|         - containerPort: 5778 | ||||
|           name: config-rest | ||||
|           protocol: TCP | ||||
|         - containerPort: 6831 | ||||
|           name: jg-compact-trft | ||||
|           protocol: UDP | ||||
|         - containerPort: 6832 | ||||
|           name: jg-binary-trft | ||||
|           protocol: UDP | ||||
|         - containerPort: 14271 | ||||
|           name: admin-http | ||||
|           protocol: TCP | ||||
|         args: | ||||
|           - --reporter.grpc.host-port=dns:///woolsey.tormakristof.eu:14250 | ||||
|           - --reporter.type=grpc | ||||
|       imagePullSecrets: | ||||
|       - name: regcred | ||||
|   | ||||
| @@ -13,4 +13,4 @@ spec: | ||||
|     protocol: TCP | ||||
|   selector: | ||||
|     app: input-service | ||||
|   type: ClusterIP | ||||
|   type: ClusterIP | ||||
|   | ||||
| @@ -1,18 +1,21 @@ | ||||
| sentry_sdk[flask] | ||||
| gunicorn | ||||
| Flask | ||||
| Flask-RESTful | ||||
| requests | ||||
| gunicorn~=20.1.0 | ||||
| Flask~=2.0.1 | ||||
| Flask-RESTful~=0.3.9 | ||||
| requests~=2.26.0 | ||||
| werkzeug | ||||
| sqlalchemy | ||||
| flask_sqlalchemy | ||||
| xeger | ||||
| pika | ||||
| psycopg2-binary | ||||
| marshmallow | ||||
| marshmallow-sqlalchemy | ||||
| xeger~=0.3.5 | ||||
| pika~=1.2.0 | ||||
| marshmallow~=3.13.0 | ||||
| flask-marshmallow | ||||
| py-healthcheck | ||||
| Flask-InfluxDB | ||||
| tzdata | ||||
| tzlocal | ||||
| tzlocal | ||||
| apscheduler~=3.7.0 | ||||
|  | ||||
| flask-redis~=0.4.0 | ||||
|  | ||||
| opentracing~=2.4.0 | ||||
| jaeger-client | ||||
| Flask-Opentracing | ||||
							
								
								
									
										97
									
								
								src/app.py
									
									
									
									
									
								
							
							
						
						
									
										97
									
								
								src/app.py
									
									
									
									
									
								
							| @@ -3,18 +3,27 @@ from flask import Flask | ||||
| from flask_restful import Api | ||||
| import sentry_sdk | ||||
| from sentry_sdk.integrations.flask import FlaskIntegration | ||||
| from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration | ||||
| from healthcheck import HealthCheck | ||||
|  | ||||
| from config import * | ||||
| from db import db | ||||
| from redis_client import redis_client | ||||
| from config import Config | ||||
| from marshm import ma | ||||
| from influxus import influx_db | ||||
| from resources import SampleResource, SampleParameterResource | ||||
| from healthchecks import health_database_status | ||||
| from resources import SampleResource | ||||
| from healthchecks import amqp_connection_status | ||||
|  | ||||
| import atexit | ||||
|  | ||||
| from apscheduler.schedulers.background import BackgroundScheduler | ||||
| from magic_amqp import magic_amqp | ||||
| from error_handlers import register_all_error_handlers | ||||
|  | ||||
| import jaeger_client | ||||
| import opentracing | ||||
| from flask_opentracing import FlaskTracing | ||||
|  | ||||
| """ | ||||
| Main Flask RESTful APIm | ||||
| Main Flask RESTful API | ||||
| """ | ||||
|  | ||||
| __author__ = "@tormakris" | ||||
| @@ -22,44 +31,66 @@ __copyright__ = "Copyright 2020, Birbnetes Team" | ||||
| __module_name__ = "app" | ||||
| __version__text__ = "1" | ||||
|  | ||||
| if SENTRY_DSN: | ||||
| if Config.SENTRY_DSN: | ||||
|     sentry_sdk.init( | ||||
|         dsn=SENTRY_DSN, | ||||
|         integrations=[FlaskIntegration(), SqlalchemyIntegration()], | ||||
|         traces_sample_rate=1.0, | ||||
|         dsn=Config.SENTRY_DSN, | ||||
|         integrations=[FlaskIntegration()], | ||||
|         traces_sample_rate=0.0, | ||||
|         send_default_pii=True, | ||||
|         release=RELEASE_ID, | ||||
|         environment=RELEASEMODE, | ||||
|         release=Config.RELEASE_ID, | ||||
|         environment=Config.RELEASEMODE, | ||||
|         _experiments={"auto_enabling_integrations": True} | ||||
|     ) | ||||
|  | ||||
| app = Flask(__name__) | ||||
| app.config[ | ||||
|     'SQLALCHEMY_DATABASE_URI'] = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_HOSTNAME}:5432/{POSTGRES_DB}{POSTGRES_OPTS}" | ||||
| app.config['EXCHANGE_NAME'] = RABBITMQ_EXCHANGE | ||||
| app.config['FLASK_PIKA_PARAMS'] = {'host': RABBITMQ_HOST, | ||||
|                                    'username': RABBITMQ_USERNAME, | ||||
|                                    'password': RABBITMQ_PASSWORD, | ||||
|                                    'port': 5672, | ||||
|                                    'virtual_host': '/'} | ||||
| app.config['INFLUXDB_HOST'] = INFLUXDB_HOST | ||||
| app.config['INFLUXDB_PORT'] = INFLUXDB_PORT | ||||
| app.config['INFLUXDB_USER'] = INFLUXDB_USERNAME | ||||
| app.config['INFLUXDB_PASSWORD'] = INFLUXDB_PASSWORD | ||||
| app.config['INFLUXDB_DATABASE'] = INFLUXDB_DB | ||||
| app.config.from_object(Config) | ||||
|  | ||||
| api = Api(app) | ||||
| health = HealthCheck() | ||||
| db.init_app(app) | ||||
| ma.init_app(app) | ||||
| influx_db.init_app(app) | ||||
|  | ||||
| with app.app_context(): | ||||
|     # influx_db.database.create(INFLUXDB_DB) | ||||
|     db.create_all() | ||||
| redis_client.init_app(app) | ||||
|  | ||||
| api.add_resource(SampleResource, "/sample") | ||||
| api.add_resource(SampleParameterResource, '/sample/<tag>') | ||||
| # ampq magic stuff | ||||
| magic_amqp.init_app(app) | ||||
|  | ||||
| ampq_loop_scheduler = BackgroundScheduler() | ||||
| ampq_loop_scheduler.add_job(func=lambda: magic_amqp.loop(), trigger="interval", seconds=5) | ||||
| atexit.register(lambda: ampq_loop_scheduler.shutdown()) | ||||
|  | ||||
| ampq_loop_scheduler.start() | ||||
|  | ||||
| if Config.ENABLE_INFLUXDB: | ||||
|     influx_db.init_app(app) | ||||
|  | ||||
|  | ||||
| @app.before_first_request | ||||
| def init_db(): | ||||
|     if Config.ENABLE_INFLUXDB: | ||||
|         influx_db.database.create(Config.INFLUXDB_DATABASE) | ||||
|  | ||||
|  | ||||
| # Setup tracing | ||||
| def initialize_tracer(): | ||||
|     app.logger.info("Initializing jaeger...") | ||||
|     jaeger_cfg = jaeger_client.Config(config={}, service_name='input-service', validate=True) | ||||
|     tracer = jaeger_cfg.initialize_tracer() | ||||
|     return tracer | ||||
|  | ||||
|  | ||||
| tracing = FlaskTracing(initialize_tracer, True, app) | ||||
|  | ||||
| api.add_resource(SampleResource, "/input") | ||||
|  | ||||
| health.add_check(amqp_connection_status) | ||||
|  | ||||
| register_all_error_handlers(app) | ||||
|  | ||||
| health.add_check(health_database_status) | ||||
| app.add_url_rule("/healthz", "healthcheck", view_func=lambda: health.run()) | ||||
|  | ||||
| if __name__ != '__main__': | ||||
|     import logging | ||||
|  | ||||
|     gunicorn_logger = logging.getLogger('gunicorn.error') | ||||
|     app.logger.handlers = gunicorn_logger.handlers | ||||
|     app.logger.setLevel(gunicorn_logger.level) | ||||
|   | ||||
| @@ -1,42 +1,40 @@ | ||||
| #!/usr/bin/env python3 | ||||
| import os | ||||
|  | ||||
|  | ||||
| """ | ||||
| Main Flask RESTful API | ||||
| """ | ||||
|  | ||||
|  | ||||
| __author__ = "@tormakris" | ||||
| __copyright__ = "Copyright 2020, Birbnetes Team" | ||||
| __module_name__ = "app" | ||||
| __version__text__ = "1" | ||||
|  | ||||
|  | ||||
| PORT = os.environ.get("INPUT_SERVICE_PORT", 8080) | ||||
| DEBUG = os.environ.get("INPUT_SERVICE_DEBUG", True) | ||||
| class Config: | ||||
|     PORT = 8080 | ||||
|     DEBUG = os.environ.get("INPUT_SERVICE_DEBUG", "true").lower() in ["true", "yes", "1"] | ||||
|  | ||||
|     SENTRY_DSN = os.environ.get("SENTRY_DSN") | ||||
|     RELEASE_ID = os.environ.get("RELEASE_ID", "test") | ||||
|     RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev") | ||||
|  | ||||
| SENTRY_DSN = os.environ.get("SENTRY_DSN") | ||||
| RELEASE_ID = os.environ.get("RELEASE_ID", "test") | ||||
| RELEASEMODE = os.environ.get("INPUT_SERVICE_RELEASEMODE", "dev") | ||||
|     REDIS_URL = os.environ['CACHE_REDIS_URL'] | ||||
|  | ||||
| RABBITMQ_HOST = os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost") | ||||
| RABBITMQ_EXCHANGE = os.getenv("INPUT_RABBITMQ_EXCHANGE", "dev") | ||||
| RABBITMQ_QUEUE = os.getenv("INPUT_RABBITMQ_QUEUE", "wave-extract") | ||||
| RABBITMQ_USERNAME = os.getenv("INPUT_RABBITMQ_USERNAME", "rabbitmq") | ||||
| RABBITMQ_PASSWORD = os.getenv("INPUT_RABBITMQ_PASSWORD", "rabbitmq") | ||||
|     EXCHANGE_NAME_META = os.getenv("INPUT_RABBITMQ_EXCHANGE_META", "sample-meta") | ||||
|     EXCHANGE_NAME_CACHE = os.getenv("INPUT_RABBITMQ_EXCHANGE_CACHE", "sample-cache") | ||||
|  | ||||
| POSTGRES_HOSTNAME = os.getenv("INPUT_POSTGRES_HOSTNAME", "localhost") | ||||
| POSTGRES_USERNAME = os.getenv("INPUT_POSTGRES_USERNAME", "input-service") | ||||
| POSTGRES_PASSWORD = os.getenv("INPUT_POSTGRES_PASSWORD", "input-service") | ||||
| POSTGRES_DB = os.getenv("INPUT_POSTGRES_DB", "input-service") | ||||
| POSTGRES_OPTS = os.getenv("INPUT_POSTGRES_OPTS", "") | ||||
|     FLASK_PIKA_PARAMS = { | ||||
|         'host': os.getenv("INPUT_RABBITMQ_HOSTNAME", "localhost"), | ||||
|         'username': os.getenv("INPUT_RABBITMQ_USERNAME", "rabbitmq"), | ||||
|         'password': os.getenv("INPUT_RABBITMQ_PASSWORD", "rabbitmq"), | ||||
|         'port': int(os.getenv("INPUT_RABBITMQ_PORT", 5672)), | ||||
|         'virtual_host': '/' | ||||
|     } | ||||
|  | ||||
| STORAGE_HOSTNAME = os.getenv("INPUT_STORAGE_HOSTNAME", "localhost:8042") | ||||
|  | ||||
| INFLUXDB_HOST = os.getenv("INFLUX_HOST", "input-influx") | ||||
| INFLUXDB_PORT = os.getenv("INFLUX_PORT", "8086") | ||||
| INFLUXDB_USERNAME = os.getenv("INFLUX_USERNAME", "input-service") | ||||
| INFLUXDB_PASSWORD = os.getenv("INFLUX_PASSWORD", "input-service-supersecret") | ||||
| INFLUXDB_DB = os.getenv("INFLUX_DB", "input-service") | ||||
|     ENABLE_INFLUXDB = os.environ.get("INPUT_ENABLE_INFLUX", "true").lower() in ["true", "yes", "1"] | ||||
|     INFLUXDB_HOST = os.getenv("INFLUX_HOST", "input-influx") | ||||
|     INFLUXDB_PORT = os.getenv("INFLUX_PORT", "8086") | ||||
|     INFLUXDB_USER = os.getenv("INFLUX_USERNAME", "input-service") | ||||
|     INFLUXDB_PASSWORD = os.getenv("INFLUX_PASSWORD", "input-service-supersecret") | ||||
|     INFLUXDB_DATABASE = os.getenv("INFLUX_DB", "input-service") | ||||
|   | ||||
							
								
								
									
										13
									
								
								src/db.py
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								src/db.py
									
									
									
									
									
								
							| @@ -1,13 +0,0 @@ | ||||
| #!/usr/bin/env python3 | ||||
| from flask_sqlalchemy import SQLAlchemy | ||||
|  | ||||
| """ | ||||
| Database api | ||||
| """ | ||||
|  | ||||
| __author__ = '@tormakris' | ||||
| __copyright__ = "Copyright 2020, Birbnetes Team" | ||||
| __module_name__ = "db" | ||||
| __version__text__ = "1" | ||||
|  | ||||
| db = SQLAlchemy() | ||||
							
								
								
									
										18
									
								
								src/error_handlers.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										18
									
								
								src/error_handlers.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,18 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
|  | ||||
| def get_standard_error_handler(code: int): | ||||
|     def error_handler(err): | ||||
|         return {"msg": str(err)}, code | ||||
|  | ||||
|     return error_handler | ||||
|  | ||||
|  | ||||
| # function to register all handlers | ||||
|  | ||||
|  | ||||
| def register_all_error_handlers(app): | ||||
|     error_codes_to_override = [404, 403, 401, 405, 400, 409, 422, 500] | ||||
|  | ||||
|     for code in error_codes_to_override: | ||||
|         app.register_error_handler(code, get_standard_error_handler(code)) | ||||
| @@ -1,6 +1,5 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| from db import db | ||||
| from magic_amqp import magic_amqp | ||||
|  | ||||
| """ | ||||
| Healthchek functions | ||||
| @@ -12,12 +11,12 @@ __module_name__ = "healthchecks" | ||||
| __version__text__ = "1" | ||||
|  | ||||
|  | ||||
| def health_database_status(): | ||||
|     is_database_working = True | ||||
|     output = 'database is ok' | ||||
|     try: | ||||
|         db.session.execute('SELECT 1') | ||||
|     except Exception as e: | ||||
|         output = str(e) | ||||
|         is_database_working = False | ||||
|     return is_database_working, output | ||||
| def amqp_connection_status(): | ||||
|     if magic_amqp.is_healthy(): | ||||
|         result = True | ||||
|         text = "amqp connection is ok" | ||||
|     else: | ||||
|         result = False | ||||
|         text = "amqp connection is unhealthy" | ||||
|  | ||||
|     return result, text | ||||
|   | ||||
							
								
								
									
										142
									
								
								src/magic_amqp.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										142
									
								
								src/magic_amqp.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,142 @@ | ||||
| from flask import Flask | ||||
| from threading import Lock | ||||
| import pika | ||||
| import pika.exceptions | ||||
| import json | ||||
| import time | ||||
|  | ||||
| import opentracing | ||||
| from opentracing.ext import tags | ||||
| from opentracing.propagation import Format | ||||
|  | ||||
|  | ||||
| class MagicAMQP: | ||||
|     """ | ||||
|     This is my pathetic attempt to make RabbitMQ connection in a Flask app reliable and performant. | ||||
|     """ | ||||
|  | ||||
|     def __init__(self, app: Flask = None): | ||||
|         self.app = app | ||||
|         if app: | ||||
|             self.init_app(app) | ||||
|  | ||||
|         self._lock = Lock() | ||||
|         self._credentials = None | ||||
|  | ||||
|     def init_app(self, app: Flask): | ||||
|         self.app = app | ||||
|         self.app.config.setdefault('FLASK_PIKA_PARAMS', {}) | ||||
|         self.app.config.setdefault('EXCHANGE_NAME_META', None) | ||||
|         self.app.config.setdefault('EXCHANGE_NAME_CACHE', None) | ||||
|  | ||||
|         self._credentials = pika.PlainCredentials( | ||||
|             app.config['FLASK_PIKA_PARAMS']['username'], | ||||
|             app.config['FLASK_PIKA_PARAMS']['password'] | ||||
|         ) | ||||
|  | ||||
|         self._reconnect_ampq() | ||||
|  | ||||
|     def _reconnect_ampq(self): | ||||
|         self._pika_connection = pika.BlockingConnection( | ||||
|             pika.ConnectionParameters( | ||||
|                 host=self.app.config['FLASK_PIKA_PARAMS']['host'], | ||||
|                 credentials=self._credentials, | ||||
|                 heartbeat=10, | ||||
|                 socket_timeout=5) | ||||
|         ) | ||||
|         self._pika_channel = self._pika_connection.channel() | ||||
|         self._pika_channel.exchange_declare( | ||||
|             exchange=self.app.config['EXCHANGE_NAME_META'], | ||||
|             exchange_type='direct' | ||||
|         ) | ||||
|         self._pika_channel.exchange_declare( | ||||
|             exchange=self.app.config['EXCHANGE_NAME_CACHE'], | ||||
|             exchange_type='direct' | ||||
|         ) | ||||
|  | ||||
|     def loop(self): | ||||
|         """ | ||||
|         This method should be called periodically to keep up the connection | ||||
|         """ | ||||
|         lock_start = time.time() | ||||
|         with self._lock: | ||||
|             lock_acquire_time = time.time() - lock_start | ||||
|             if lock_acquire_time >= 0.5: | ||||
|                 self.app.logger.warning(f"Loop: Lock acquire took {lock_acquire_time:5f} sec") | ||||
|  | ||||
|             try: | ||||
|                 self._pika_connection.process_data_events(0) | ||||
|                 # We won't attempt retry if this fail | ||||
|             except pika.exceptions.AMQPConnectionError as e: | ||||
|                 self.app.logger.warning(f"Connection error during process loop: {e} (attempting reconnect)") | ||||
|                 self._reconnect_ampq() | ||||
|  | ||||
|             total_time = time.time() - lock_start | ||||
|             if total_time > 1: | ||||
|                 self.app.logger.warning(f"Loop: Total loop took {total_time:5f} sec") | ||||
|  | ||||
|     def _publish(self, exchange: str, payload=None): | ||||
|         """ | ||||
|         Publish a simple json serialized message to the configured queue. | ||||
|         If the connection is broken, then this call will block until the connection is restored | ||||
|         """ | ||||
|         span_tags = {tags.SPAN_KIND: tags.SPAN_KIND_PRODUCER} | ||||
|         with opentracing.tracer.start_active_span('magic_amqp.publish', tags=span_tags) as scope: | ||||
|             opentracing.tracer.inject(scope.span.context, Format.TEXT_MAP, payload) | ||||
|             lock_start = time.time() | ||||
|             with self._lock: | ||||
|                 scope.span.log_kv({'event': 'lockAcquired'}) | ||||
|                 lock_acquire_time = time.time() - lock_start | ||||
|                 if lock_acquire_time >= 0.2: | ||||
|                     self.app.logger.warning(f"Publish: Lock acquire took {lock_acquire_time:5f} sec") | ||||
|                 tries = 0 | ||||
|                 while True: | ||||
|                     try: | ||||
|                         self._pika_channel.basic_publish( | ||||
|                             exchange=exchange, | ||||
|                             routing_key='feature', | ||||
|                             body=json.dumps(payload).encode('UTF-8') | ||||
|                         ) | ||||
|                         self.app.logger.debug(f"Published: {payload}") | ||||
|                         break  # message sent successfully | ||||
|                     except pika.exceptions.AMQPConnectionError as e: | ||||
|                         scope.span.log_kv({'event': 'connectionError', 'error': str(e)}) | ||||
|                         self.app.logger.warning(f"Connection error during publish: {e} (attempting reconnect)") | ||||
|  | ||||
|                         if tries > 30: | ||||
|                             raise  # just give up | ||||
|  | ||||
|                         while True: | ||||
|                             try: | ||||
|                                 self._reconnect_ampq() | ||||
|                                 break | ||||
|                             except pika.exceptions.AMQPConnectionError as e: | ||||
|                                 self.app.logger.warning( | ||||
|                                     f"Connection error during reconnection: {e} (attempting reconnect)") | ||||
|                                 tries += 1 | ||||
|  | ||||
|                                 if tries > 30: | ||||
|                                     raise  # just give up | ||||
|  | ||||
|                             if tries > 10: | ||||
|                                 time.sleep(2) | ||||
|                 total_time = time.time() - lock_start | ||||
|                 if total_time > 0.4: | ||||
|                     self.app.logger.warning(f"Publish: Total publish took {total_time:5f} sec") | ||||
|  | ||||
|     def publish_cache(self, payload=None): | ||||
|         return self._publish(self.app.config['EXCHANGE_NAME_CACHE'], payload) | ||||
|  | ||||
|     def publish_meta(self, payload=None): | ||||
|         return self._publish(self.app.config['EXCHANGE_NAME_META'], payload) | ||||
|  | ||||
|     def is_healthy(self) -> bool: | ||||
|         with self._lock: | ||||
|             if not self._pika_channel: | ||||
|                 return False | ||||
|  | ||||
|             return self._pika_channel.is_open and self._pika_connection.is_open | ||||
|  | ||||
|  | ||||
| # instance to be used in the flask app | ||||
| magic_amqp = MagicAMQP() | ||||
| @@ -1,25 +0,0 @@ | ||||
| #!/usr/bin/env python3 | ||||
| from db import db | ||||
| from sqlalchemy.sql import func | ||||
|  | ||||
| """ | ||||
| Flask Restful endpoints | ||||
| """ | ||||
|  | ||||
| __author__ = '@tormakris' | ||||
| __copyright__ = "Copyright 2020, Birbnetes Team" | ||||
| __module_name__ = "models" | ||||
| __version__text__ = "1" | ||||
|  | ||||
|  | ||||
| class SampleMetadata(db.Model): | ||||
|     """ | ||||
|     SQLAlchemy model of metadata entries | ||||
|     """ | ||||
|     id = db.Column(db.Integer, primary_key=True, auto_increment=True) | ||||
|     timestamp = db.Column(db.TIMESTAMP, nullable=False, server_default=func.now()) | ||||
|  | ||||
|     device_id = db.Column(db.Integer, nullable=False) | ||||
|     device_date = db.Column(db.DateTime, nullable=False) | ||||
|  | ||||
|     tag = db.Column(db.String(32), nullable=False, unique=True) | ||||
							
								
								
									
										4
									
								
								src/redis_client.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								src/redis_client.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,4 @@ | ||||
| #!/usr/bin/env python3 | ||||
| from flask_redis import FlaskRedis | ||||
|  | ||||
| redis_client = FlaskRedis() | ||||
							
								
								
									
										196
									
								
								src/resources.py
									
									
									
									
									
								
							
							
						
						
									
										196
									
								
								src/resources.py
									
									
									
									
									
								
							| @@ -1,17 +1,15 @@ | ||||
| #!/usr/bin/env python3 | ||||
| import json | ||||
| import io | ||||
| from datetime import datetime | ||||
| import tzlocal | ||||
| from xeger import Xeger | ||||
| from flask_restful import Resource | ||||
| from flask import request, current_app | ||||
| import requests | ||||
| import pika | ||||
| from db import db | ||||
| from flask import request, current_app, abort | ||||
| from magic_amqp import magic_amqp | ||||
| from influxus import influx_db | ||||
| from models import SampleMetadata | ||||
| from schemas import SampleSchema, SampleMetadataSchema | ||||
| from config import * | ||||
| from schemas import SampleSchema | ||||
| from redis_client import redis_client | ||||
| import opentracing | ||||
|  | ||||
| """ | ||||
| Flask Restful endpoints | ||||
| @@ -30,129 +28,87 @@ class SampleResource(Resource): | ||||
|     """ | ||||
|  | ||||
|     sampleschema = SampleSchema(many=False) | ||||
|     samplemetadataschema = SampleMetadataSchema(many=True) | ||||
|  | ||||
|     def post(self): | ||||
|         """ | ||||
|         Post request send to the endpoint | ||||
|         :return: | ||||
|         """ | ||||
|         if 'file' not in request.files: | ||||
|             return {"err_msg": "no file found"}, 469 | ||||
|         else: | ||||
|             soundfile = request.files['file'] | ||||
|         with opentracing.tracer.start_active_span('parseAndValidate'): | ||||
|             if 'file' not in request.files: | ||||
|                 return abort(400, "no file found") | ||||
|             else: | ||||
|                 soundfile = request.files['file'] | ||||
|  | ||||
|         if 'description' not in request.form: | ||||
|             return {"err_msg": "no description found"}, 470 | ||||
|         else: | ||||
|             description = request.form.get("description") | ||||
|             if 'description' not in request.form: | ||||
|                 return abort(400, "no description found") | ||||
|             else: | ||||
|                 description_raw = request.form.get("description") | ||||
|  | ||||
|         if soundfile.content_type != 'audio/wave': | ||||
|             current_app.logger.info( | ||||
|                 f"Input file was not WAV.") | ||||
|             return {'err_msg': 'Input file not a wave file.'}, 415 | ||||
|             if soundfile.content_type != 'audio/wave': | ||||
|                 current_app.logger.info(f"Input file was not WAV.") | ||||
|                 return abort(415, 'Input file not a wave file.') | ||||
|             try: | ||||
|                 desc = self.sampleschema.loads(description_raw) | ||||
|             except Exception as e: | ||||
|                 current_app.logger.exception(e) | ||||
|                 return abort(417, 'Input JSON schema invalid') | ||||
|  | ||||
|         try: | ||||
|             desc = self.sampleschema.loads(description) | ||||
|         except Exception as e: | ||||
|             current_app.logger.exception(e) | ||||
|             return {'err_msg': 'Input JSON schema invalid'}, 417 | ||||
|         with opentracing.tracer.start_active_span('generateTag'): | ||||
|             xeger = Xeger(limit=30) | ||||
|             while True: | ||||
|                 generated_tag = xeger.xeger(r'^[a-zA-Z]+[0-9a-zA-Z_]*$')[:32] | ||||
|                 if len(generated_tag) > 2:  # Ensure minimum length | ||||
|                     break | ||||
|  | ||||
|         xeger = Xeger(limit=30) | ||||
|         while True: | ||||
|             generated_tag = xeger.xeger(r'^[a-zA-Z]+[0-9a-zA-Z_]*$')[:32] | ||||
|             if len(generated_tag) > 2:  # Ensure minimum length | ||||
|                 break | ||||
|  | ||||
|         # Handle mega-autismo-cliento | ||||
|         soundfile_content_length = soundfile.content_length | ||||
|         if soundfile_content_length <= 0:  # BRUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUH | ||||
|             current_app.logger.debug( | ||||
|                 "The uploader did not provide content-length for the sound file... Calculating manually..." | ||||
|             ) | ||||
|             # So, this is a seekable stream, so we just seek to the end | ||||
|             old_ptr = soundfile.tell() | ||||
|             soundfile.seek(0, 2) | ||||
|             # Check where is the end (= content length) | ||||
|             soundfile_content_length = soundfile.tell() | ||||
|             # Seek back to where the stream was | ||||
|             soundfile.seek(old_ptr, 0) | ||||
|  | ||||
|             # It's insane, that you can not set this field in curl | ||||
|  | ||||
|         record = SampleMetadata( | ||||
|             device_id=desc['device_id'], | ||||
|             device_date=desc['date'], | ||||
|             tag=generated_tag) | ||||
|         try: | ||||
|             db.session.add(record) | ||||
|             requests.post( | ||||
|                 f"http://{STORAGE_HOSTNAME}/object", | ||||
|                 files={ | ||||
|                     'description': (None, json.dumps({'tag': generated_tag}), 'application/json'), | ||||
|                     'soundFile': ( | ||||
|                         'wave.wav', | ||||
|                         soundfile, | ||||
|                         soundfile.content_type, | ||||
|                         {'Content-Length': soundfile_content_length})}).raise_for_status()  # Anyádat curl am | ||||
|             credentials = pika.PlainCredentials(current_app.config['FLASK_PIKA_PARAMS']['username'], | ||||
|                                                 current_app.config['FLASK_PIKA_PARAMS']['password']) | ||||
|             connection = pika.BlockingConnection( | ||||
|                 pika.ConnectionParameters(host=current_app.config['FLASK_PIKA_PARAMS']['host'], | ||||
|                                           credentials=credentials, | ||||
|                                           heartbeat=0, | ||||
|                                           socket_timeout=5)) | ||||
|             channel = connection.channel() | ||||
|             channel.exchange_declare(exchange=current_app.config['EXCHANGE_NAME'], | ||||
|                                      exchange_type='direct') | ||||
|             channel.basic_publish(exchange=current_app.config['EXCHANGE_NAME'], | ||||
|                                   routing_key='feature', | ||||
|                                   body=json.dumps({'tag': generated_tag}).encode('UTF-8')) | ||||
|             connection.close() | ||||
|             influx_db.write_points( | ||||
|                 [ | ||||
|         with opentracing.tracer.start_active_span('publishMetaMessage'): | ||||
|             try: | ||||
|                 magic_amqp.publish_meta( | ||||
|                     { | ||||
|                         'time': datetime.now(tz=tzlocal.get_localzone()), | ||||
|                         'measurement': 'cloudinput', | ||||
|                         'tags': { | ||||
|                             'device': desc['device_id'] | ||||
|                         }, | ||||
|                         'fields': { | ||||
|                             'bruh': 1.0 | ||||
|                         } | ||||
|                         'tag': generated_tag, | ||||
|                         'timestamp': datetime.now().isoformat(), | ||||
|                         'device_id': desc['device_id'], | ||||
|                         'device_date': desc['date'].isoformat() | ||||
|                     } | ||||
|                 ] | ||||
|             ) | ||||
|         except Exception as e: | ||||
|             current_app.logger.exception(e) | ||||
|             db.session.rollback() | ||||
|             return {"err_msg": str( | ||||
|                 e), "hint": "DB or downstream service error"}, 569 | ||||
|                 ) | ||||
|             except Exception as e: | ||||
|                 current_app.logger.exception(e) | ||||
|                 return abort(500, f"AMQP Publish error: {str(e)}") | ||||
|  | ||||
|         with opentracing.tracer.start_active_span('readSampleToMemory'): | ||||
|             buf = io.BytesIO() | ||||
|             soundfile.save(buf) | ||||
|  | ||||
|         with opentracing.tracer.start_active_span('putToCache'): | ||||
|             redis_client.set(generated_tag, buf.getbuffer())  # getbuffer is quick as it does not copy like getvalue | ||||
|  | ||||
|         # Announce only after the data is successfully committed | ||||
|         with opentracing.tracer.start_active_span('publishInCacheMessage'): | ||||
|             try: | ||||
|                 magic_amqp.publish_cache({ | ||||
|                     'tag': generated_tag, | ||||
|                     'mime_type': soundfile.mimetype | ||||
|                 }) | ||||
|             except Exception as e: | ||||
|                 current_app.logger.exception(e) | ||||
|                 return abort(500, f"AMQP Publish error: {str(e)}") | ||||
|  | ||||
|         # metrics | ||||
|         if current_app.config['ENABLE_INFLUXDB']: | ||||
|             with opentracing.tracer.start_active_span('influxdb.write_points'): | ||||
|                 influx_db.write_points( | ||||
|                     [ | ||||
|                         { | ||||
|                             'time': datetime.now(tz=tzlocal.get_localzone()), | ||||
|                             'measurement': 'cloudinput', | ||||
|                             'tags': { | ||||
|                                 'device': desc['device_id'] | ||||
|                             }, | ||||
|                             'fields': { | ||||
|                                 'bruh': 1.0 | ||||
|                             } | ||||
|                         } | ||||
|                     ] | ||||
|                 ) | ||||
|  | ||||
|         db.session.commit() | ||||
|         return {"tag": generated_tag}, 200 | ||||
|  | ||||
|     def get(self): | ||||
|         """ | ||||
|         Get all stored items | ||||
|         :return: | ||||
|         """ | ||||
|         samples = SampleMetadata.query.all() | ||||
|         return self.samplemetadataschema.dump(list(samples)), 200 | ||||
|  | ||||
|  | ||||
| class SampleParameterResource(Resource): | ||||
|     """ | ||||
|     Sample endpoint with parameters | ||||
|     """ | ||||
|  | ||||
|     samplemetadataschema = SampleMetadataSchema(many=False) | ||||
|  | ||||
|     def get(self, tag: str): | ||||
|         """ | ||||
|         Get a specific item | ||||
|         :param tag: | ||||
|         :return: | ||||
|         """ | ||||
|         sample = SampleMetadata.query.filter_by(tag=tag).first_or_404() | ||||
|         return self.samplemetadataschema.dump(sample), 200 | ||||
|   | ||||
| @@ -1,7 +1,4 @@ | ||||
| #!/usr/bin/env python3 | ||||
| from flask_marshmallow.sqla import auto_field | ||||
|  | ||||
| from models import SampleMetadata | ||||
| from marshm import ma | ||||
| from marshmallow import fields | ||||
|  | ||||
| @@ -26,13 +23,3 @@ class SampleSchema(ma.Schema): | ||||
|  | ||||
|     date = fields.DateTime(required=True) | ||||
|     device_id = fields.Integer(required=True) | ||||
|  | ||||
|  | ||||
| class SampleMetadataSchema(ma.SQLAlchemyAutoSchema): | ||||
|     """ | ||||
|     Marshmallow schema generated | ||||
|     """ | ||||
|     class Meta: | ||||
|         model = SampleMetadata | ||||
|         exclude = ('timestamp', 'id', 'device_date',) | ||||
|     date = auto_field("device_date", dump_only=False) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user