diff --git a/requirements.txt b/requirements.txt index 5273478..de3c84e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ requests~=2.26.0 redis~=3.5.3 +pika~=1.2.0 sentry_sdk~=1.3.1 opentracing~=2.4.0 diff --git a/storage_service_feeder/config.py b/storage_service_feeder/config.py new file mode 100644 index 0000000..4bc215e --- /dev/null +++ b/storage_service_feeder/config.py @@ -0,0 +1,22 @@ +import os +import sys +import logging + + +class Config: + PIKA_URL = os.environ['PIKA_URL'] + REDIS_URL = os.environ["REDIS_URL"] + PIKA_SAMPLE_CACHE_EXCHANGE = os.environ.get('PIKA_SAMPLE_CACHE_EXCHANGE', 'sample-cache') + + STORAGE_SERVICE_URL = os.environ.get("STORAGE_SERVICE_URL", "http://storage-service/") + + SENTRY_DSN = os.environ.get("SENTRY_DSN") + + RELEASE_ID = os.environ.get('RELEASE_ID', 'test') + RELEASEMODE = os.environ.get('RELEASEMODE', 'dev') + + LOG_LEVEL = logging.DEBUG if ( + '--debug' in sys.argv + ) or ( + os.environ.get('DEBUG', '0').lower() in ['yes', 'true', '1'] + ) else logging.INFO diff --git a/storage_service_feeder/feeder.py b/storage_service_feeder/feeder.py new file mode 100644 index 0000000..656273b --- /dev/null +++ b/storage_service_feeder/feeder.py @@ -0,0 +1,44 @@ +from requests_opentracing import SessionTracing +import opentracing +import json + +from urllib.parse import urljoin + +import redis + +from config import Config + + +class Feeder: + + def __init__(self): + self._session = SessionTracing(propagate=True) + self._redis_client = redis.from_url(Config.REDIS_URL) + + def do_feed(self, tag: str, mimetype: str = 'audio/wave'): + with opentracing.tracer.start_active_span('feeder.getFromCache'): + sample_data = self._redis_client.get(tag) + + with opentracing.tracer.start_active_span('feeder.uploadToStorageService'): + with opentracing.tracer.start_active_span('uploadToStorageService'): + files = { + 'description': (None, json.dumps({'tag': tag}), 'application/json'), + 'soundFile': ( + 'wave.wav', # <- this is not used by upstream + sample_data, + mimetype, + { + 'Content-Length': len(sample_data) + } + ) + } + + r = self._session.post( + urljoin(Config.STORAGE_SERVICE_URL, "object"), + files=files + ) + + r.raise_for_status() + + with opentracing.tracer.start_active_span('feeder.deleteFromCache'): + self._redis_client.delete(tag) diff --git a/storage_service_feeder/main.py b/storage_service_feeder/main.py index a53c62e..335adc7 100644 --- a/storage_service_feeder/main.py +++ b/storage_service_feeder/main.py @@ -1,5 +1,119 @@ +#!/usr/bin/env python3 +import time +import uuid +import jaeger_client +import opentracing +from opentracing.ext import tags +from opentracing.propagation import Format + +import logging +import sys +import pika +import json + +from sentry_sdk.integrations.logging import LoggingIntegration +import sentry_sdk + +from config import Config + +from feeder import Feeder + + +def message_callback_factory(feeder: Feeder): + def message_callback(channel, method, properties, body): + try: + msg = json.loads(body.decode('utf-8')) + except (UnicodeDecodeError, json.JSONDecodeError) as e: + logging.warning(f"Invalid message received: {e}") + channel.basic_ack(delivery_tag=method.delivery_tag) # We don't want this to be requeue + return + + logging.debug(f"Handling message: {msg}") + + if 'tag' not in msg: + logging.warning(f"Invalid message received: No tag field") + channel.basic_ack(delivery_tag=method.delivery_tag) # We don't want this to be requeue + return + tag = msg['tag'] + mime_type = msg.get('mime_type', 'audio/wave') + + span_ctx = opentracing.tracer.extract(Format.TEXT_MAP, msg) + span_tags = { + tags.SPAN_KIND: tags.SPAN_KIND_CONSUMER, + 'sample_tag': tag + } + + with opentracing.tracer.start_active_span( + 'main.handleMessage', finish_on_close=True, child_of=span_ctx, tags=span_tags + ) as scope: + try: + feeder.do_feed(tag, mime_type) + except Exception as e: + logging.error(f"Something went wrong during handling sample {tag} run: {e}; msg: {msg}") + logging.exception(e) + channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) + else: + # successful + channel.basic_ack(delivery_tag=method.delivery_tag) + + return message_callback + + def main(): - pass + # setup logging + + logging.basicConfig( + stream=sys.stdout, + format="%(asctime)s - %(name)s [%(levelname)s]: %(message)s", + level=Config.LOG_LEVEL + ) + + # setup observability stuffs + + if Config.SENTRY_DSN: + sentry_logging = LoggingIntegration( + level=logging.DEBUG, # Capture info and above as breadcrumbs + event_level=logging.ERROR # Send errors as events + ) + sentry_sdk.init( + dsn=Config.SENTRY_DSN, + integrations=[sentry_logging], + traces_sample_rate=1.0, + send_default_pii=True, + release=Config.RELEASE_ID, + environment=Config.RELEASEMODE, + _experiments={"auto_enabling_integrations": True} + ) + + jaeger_client.Config(config={}, service_name='storage-service-feeder', validate=True).initialize_tracer() + + # Start the memes + logging.info("Preparing feeder...") + feeder = Feeder() + + logging.info("Connecting to MQ service...") + connection = pika.BlockingConnection(pika.connection.URLParameters(Config.PIKA_URL)) + channel = connection.channel() + channel.exchange_declare(exchange=Config.PIKA_SAMPLE_CACHE_EXCHANGE, exchange_type='direct') + + queue_declare_result = channel.queue_declare(queue=str(uuid.uuid4()), exclusive=False) + queue_name = queue_declare_result.method.queue + + channel.queue_bind(exchange=Config.PIKA_SAMPLE_CACHE_EXCHANGE, queue=queue_name) + + channel.basic_qos(prefetch_count=1) + + channel.basic_consume(queue=queue_name, on_message_callback=message_callback_factory(feeder), auto_ack=False) + + logging.info("Connection complete! Listening to messages...") + try: + channel.start_consuming() + except KeyboardInterrupt: + logging.info("SIGINT Received! Stopping stuff...") + channel.stop_consuming() + + time.sleep(2) # lol + opentracing.tracer.close() if __name__ == '__main__':