diff --git a/requirements.txt b/requirements.txt index f600499..bbe842c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,8 @@ sentry_sdk pika requests -paho-mqtt \ No newline at end of file +paho-mqtt + +opentracing~=2.4.0 +jaeger-client +requests-opentracing \ No newline at end of file diff --git a/src/app.py b/src/app.py index c292881..e9a1a21 100644 --- a/src/app.py +++ b/src/app.py @@ -8,6 +8,12 @@ import pika import requests from sentry_sdk.integrations.logging import LoggingIntegration +import jaeger_client +import opentracing +from opentracing.ext import tags +from opentracing.propagation import Format +from requests_opentracing import SessionTracing + import config import uuid from mqtt_helper import MQTT @@ -48,6 +54,8 @@ def on_message_creator(mqtt_: MQTT): This generator is used, so that the mqtt object can be injected just when the callback is registered """ + requests_session = SessionTracing(propagate=True) + def on_message( channel: pika.channel.Channel, method: pika.spec.Basic.Deliver, @@ -61,41 +69,57 @@ def on_message_creator(mqtt_: MQTT): channel.basic_ack(delivery_tag=method.delivery_tag) return - if ('probability' not in msg_json) or ('class' not in msg_json): - logging.error("Malformed message from classifier: Missing fields") - channel.basic_ack(delivery_tag=method.delivery_tag) - return + span_ctx = opentracing.tracer.extract(Format.TEXT_MAP, msg_json) + span_tags = {tags.SPAN_KIND: tags.SPAN_KIND_CONSUMER} - # TODO: strurnus should not be hardcoded here - if (msg_json['class'] == 'sturnus') and (msg_json['probability'] > config.TRIGGER_LEVEL): - try: - r = requests.get( - f"http://{config.INPUT_HOSTNAME}/sample/{msg_json['tag']}", - timeout=config.INPUT_TIMEOUT - ) - except requests.exceptions.Timeout: - logging.error(f"Input-service timed out! (Timeout: {config.INPUT_TIMEOUT} sec)") - channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) + with opentracing.tracer.start_active_span( + 'handleMessage', finish_on_close=True, child_of=span_ctx, tags=span_tags + ) as scope: + + if ('probability' not in msg_json) or ('class' not in msg_json): + logging.error("Malformed message from classifier: Missing fields") + channel.basic_ack(delivery_tag=method.delivery_tag) return - if r.status_code != 200: - logging.error(f"Input-service status code is not 200: {r.status_code}") - channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) - return + # TODO: strurnus should not be hardcoded here + if (msg_json['class'] == 'sturnus') and (msg_json['probability'] > config.TRIGGER_LEVEL): + scope.span.log_kv({'event': 'decisionMade', 'alerting': True}) + try: + r = requests_session.get( + f"http://{config.INPUT_HOSTNAME}/sample/{msg_json['tag']}", + timeout=config.INPUT_TIMEOUT + ) + except requests.exceptions.Timeout: + logging.error(f"Input-service timed out! (Timeout: {config.INPUT_TIMEOUT} sec)") + channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) + return - if 'device_id' not in r.json(): - logging.error("Input-service response invalid") - channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) - return + if r.status_code != 200: + logging.error(f"Input-service status code is not 200: {r.status_code}") + channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) + return - logging.info(f"Sending alert command to device {r.json()['device_id']}...") - mqtt_.publish( - subtopic=r.json()['device_id'], - message=json.dumps({"command": "doAlert"}) - ) + if 'device_id' not in r.json(): + logging.error("Input-service response invalid") + channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) + return - else: - logging.debug(f"Probability is either bellow trigger level, or not the target class. Nothing to do.") + logging.info(f"Sending alert command to device {r.json()['device_id']}...") + with opentracing.tracer.start_active_span( + 'publishAlert', + tags={ + tags.SPAN_KIND: tags.SPAN_KIND_PRODUCER, + "device_id": r.json()['device_id'] + } + ): + mqtt_.publish( + subtopic=r.json()['device_id'], + message=json.dumps({"command": "doAlert"}) + ) + + else: + scope.span.log_kv({'event': 'decisionMade', 'alerting': False}) + logging.debug(f"Probability is either bellow trigger level, or not the target class. Nothing to do.") # This concludes the job channel.basic_ack(delivery_tag=method.delivery_tag) @@ -123,6 +147,7 @@ def main(): environment=config.RELEASEMODE, _experiments={"auto_enabling_integrations": True} ) + jaeger_client.Config(config={}, service_name='guard-service', validate=True).initialize_tracer() logging.info("Guard service starting...") mqtt = MQTT() mqtt.topic = config.MQTT_TOPIC