This commit is contained in:
		
							
								
								
									
										22
									
								
								storage_service_feeder/config.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										22
									
								
								storage_service_feeder/config.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,22 @@
 | 
			
		||||
import os
 | 
			
		||||
import sys
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Config:
 | 
			
		||||
    PIKA_URL = os.environ['PIKA_URL']
 | 
			
		||||
    REDIS_URL = os.environ["REDIS_URL"]
 | 
			
		||||
    PIKA_SAMPLE_CACHE_EXCHANGE = os.environ.get('PIKA_SAMPLE_CACHE_EXCHANGE', 'sample-cache')
 | 
			
		||||
 | 
			
		||||
    STORAGE_SERVICE_URL = os.environ.get("STORAGE_SERVICE_URL", "http://storage-service/")
 | 
			
		||||
 | 
			
		||||
    SENTRY_DSN = os.environ.get("SENTRY_DSN")
 | 
			
		||||
 | 
			
		||||
    RELEASE_ID = os.environ.get('RELEASE_ID', 'test')
 | 
			
		||||
    RELEASEMODE = os.environ.get('RELEASEMODE', 'dev')
 | 
			
		||||
 | 
			
		||||
    LOG_LEVEL = logging.DEBUG if (
 | 
			
		||||
                                         '--debug' in sys.argv
 | 
			
		||||
                                 ) or (
 | 
			
		||||
                                         os.environ.get('DEBUG', '0').lower() in ['yes', 'true', '1']
 | 
			
		||||
                                 ) else logging.INFO
 | 
			
		||||
							
								
								
									
										44
									
								
								storage_service_feeder/feeder.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										44
									
								
								storage_service_feeder/feeder.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,44 @@
 | 
			
		||||
from requests_opentracing import SessionTracing
 | 
			
		||||
import opentracing
 | 
			
		||||
import json
 | 
			
		||||
 | 
			
		||||
from urllib.parse import urljoin
 | 
			
		||||
 | 
			
		||||
import redis
 | 
			
		||||
 | 
			
		||||
from config import Config
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Feeder:
 | 
			
		||||
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self._session = SessionTracing(propagate=True)
 | 
			
		||||
        self._redis_client = redis.from_url(Config.REDIS_URL)
 | 
			
		||||
 | 
			
		||||
    def do_feed(self, tag: str, mimetype: str = 'audio/wave'):
 | 
			
		||||
        with opentracing.tracer.start_active_span('feeder.getFromCache'):
 | 
			
		||||
            sample_data = self._redis_client.get(tag)
 | 
			
		||||
 | 
			
		||||
        with opentracing.tracer.start_active_span('feeder.uploadToStorageService'):
 | 
			
		||||
            with opentracing.tracer.start_active_span('uploadToStorageService'):
 | 
			
		||||
                files = {
 | 
			
		||||
                    'description': (None, json.dumps({'tag': tag}), 'application/json'),
 | 
			
		||||
                    'soundFile': (
 | 
			
		||||
                        'wave.wav',  # <- this is not used by upstream
 | 
			
		||||
                        sample_data,
 | 
			
		||||
                        mimetype,
 | 
			
		||||
                        {
 | 
			
		||||
                            'Content-Length': len(sample_data)
 | 
			
		||||
                        }
 | 
			
		||||
                    )
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                r = self._session.post(
 | 
			
		||||
                    urljoin(Config.STORAGE_SERVICE_URL, "object"),
 | 
			
		||||
                    files=files
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                r.raise_for_status()
 | 
			
		||||
 | 
			
		||||
        with opentracing.tracer.start_active_span('feeder.deleteFromCache'):
 | 
			
		||||
            self._redis_client.delete(tag)
 | 
			
		||||
@@ -1,5 +1,119 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
import time
 | 
			
		||||
import uuid
 | 
			
		||||
import jaeger_client
 | 
			
		||||
import opentracing
 | 
			
		||||
from opentracing.ext import tags
 | 
			
		||||
from opentracing.propagation import Format
 | 
			
		||||
 | 
			
		||||
import logging
 | 
			
		||||
import sys
 | 
			
		||||
import pika
 | 
			
		||||
import json
 | 
			
		||||
 | 
			
		||||
from sentry_sdk.integrations.logging import LoggingIntegration
 | 
			
		||||
import sentry_sdk
 | 
			
		||||
 | 
			
		||||
from config import Config
 | 
			
		||||
 | 
			
		||||
from feeder import Feeder
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def message_callback_factory(feeder: Feeder):
 | 
			
		||||
    def message_callback(channel, method, properties, body):
 | 
			
		||||
        try:
 | 
			
		||||
            msg = json.loads(body.decode('utf-8'))
 | 
			
		||||
        except (UnicodeDecodeError, json.JSONDecodeError) as e:
 | 
			
		||||
            logging.warning(f"Invalid message received: {e}")
 | 
			
		||||
            channel.basic_ack(delivery_tag=method.delivery_tag)  # We don't want this to be requeue
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        logging.debug(f"Handling message: {msg}")
 | 
			
		||||
 | 
			
		||||
        if 'tag' not in msg:
 | 
			
		||||
            logging.warning(f"Invalid message received: No tag field")
 | 
			
		||||
            channel.basic_ack(delivery_tag=method.delivery_tag)  # We don't want this to be requeue
 | 
			
		||||
            return
 | 
			
		||||
        tag = msg['tag']
 | 
			
		||||
        mime_type = msg.get('mime_type', 'audio/wave')
 | 
			
		||||
 | 
			
		||||
        span_ctx = opentracing.tracer.extract(Format.TEXT_MAP, msg)
 | 
			
		||||
        span_tags = {
 | 
			
		||||
            tags.SPAN_KIND: tags.SPAN_KIND_CONSUMER,
 | 
			
		||||
            'sample_tag': tag
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        with opentracing.tracer.start_active_span(
 | 
			
		||||
                'main.handleMessage', finish_on_close=True, child_of=span_ctx, tags=span_tags
 | 
			
		||||
        ) as scope:
 | 
			
		||||
            try:
 | 
			
		||||
                feeder.do_feed(tag, mime_type)
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                logging.error(f"Something went wrong during handling sample {tag} run: {e}; msg: {msg}")
 | 
			
		||||
                logging.exception(e)
 | 
			
		||||
                channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
 | 
			
		||||
            else:
 | 
			
		||||
                # successful
 | 
			
		||||
                channel.basic_ack(delivery_tag=method.delivery_tag)
 | 
			
		||||
 | 
			
		||||
    return message_callback
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def main():
 | 
			
		||||
    pass
 | 
			
		||||
    # setup logging
 | 
			
		||||
 | 
			
		||||
    logging.basicConfig(
 | 
			
		||||
        stream=sys.stdout,
 | 
			
		||||
        format="%(asctime)s - %(name)s [%(levelname)s]: %(message)s",
 | 
			
		||||
        level=Config.LOG_LEVEL
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # setup observability stuffs
 | 
			
		||||
 | 
			
		||||
    if Config.SENTRY_DSN:
 | 
			
		||||
        sentry_logging = LoggingIntegration(
 | 
			
		||||
            level=logging.DEBUG,  # Capture info and above as breadcrumbs
 | 
			
		||||
            event_level=logging.ERROR  # Send errors as events
 | 
			
		||||
        )
 | 
			
		||||
        sentry_sdk.init(
 | 
			
		||||
            dsn=Config.SENTRY_DSN,
 | 
			
		||||
            integrations=[sentry_logging],
 | 
			
		||||
            traces_sample_rate=1.0,
 | 
			
		||||
            send_default_pii=True,
 | 
			
		||||
            release=Config.RELEASE_ID,
 | 
			
		||||
            environment=Config.RELEASEMODE,
 | 
			
		||||
            _experiments={"auto_enabling_integrations": True}
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    jaeger_client.Config(config={}, service_name='storage-service-feeder', validate=True).initialize_tracer()
 | 
			
		||||
 | 
			
		||||
    # Start the memes
 | 
			
		||||
    logging.info("Preparing feeder...")
 | 
			
		||||
    feeder = Feeder()
 | 
			
		||||
 | 
			
		||||
    logging.info("Connecting to MQ service...")
 | 
			
		||||
    connection = pika.BlockingConnection(pika.connection.URLParameters(Config.PIKA_URL))
 | 
			
		||||
    channel = connection.channel()
 | 
			
		||||
    channel.exchange_declare(exchange=Config.PIKA_SAMPLE_CACHE_EXCHANGE, exchange_type='direct')
 | 
			
		||||
 | 
			
		||||
    queue_declare_result = channel.queue_declare(queue=str(uuid.uuid4()), exclusive=False)
 | 
			
		||||
    queue_name = queue_declare_result.method.queue
 | 
			
		||||
 | 
			
		||||
    channel.queue_bind(exchange=Config.PIKA_SAMPLE_CACHE_EXCHANGE, queue=queue_name)
 | 
			
		||||
 | 
			
		||||
    channel.basic_qos(prefetch_count=1)
 | 
			
		||||
 | 
			
		||||
    channel.basic_consume(queue=queue_name, on_message_callback=message_callback_factory(feeder), auto_ack=False)
 | 
			
		||||
 | 
			
		||||
    logging.info("Connection complete! Listening to messages...")
 | 
			
		||||
    try:
 | 
			
		||||
        channel.start_consuming()
 | 
			
		||||
    except KeyboardInterrupt:
 | 
			
		||||
        logging.info("SIGINT Received! Stopping stuff...")
 | 
			
		||||
        channel.stop_consuming()
 | 
			
		||||
 | 
			
		||||
    time.sleep(2)  # lol
 | 
			
		||||
    opentracing.tracer.close()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user