This commit is contained in:
		| @@ -1,4 +1,8 @@ | |||||||
| sentry_sdk | sentry_sdk | ||||||
| pika | pika | ||||||
| requests | requests | ||||||
| paho-mqtt | paho-mqtt | ||||||
|  |  | ||||||
|  | opentracing~=2.4.0 | ||||||
|  | jaeger-client | ||||||
|  | requests-opentracing | ||||||
							
								
								
									
										83
									
								
								src/app.py
									
									
									
									
									
								
							
							
						
						
									
										83
									
								
								src/app.py
									
									
									
									
									
								
							| @@ -8,6 +8,12 @@ import pika | |||||||
| import requests | import requests | ||||||
| from sentry_sdk.integrations.logging import LoggingIntegration | from sentry_sdk.integrations.logging import LoggingIntegration | ||||||
|  |  | ||||||
|  | import jaeger_client | ||||||
|  | import opentracing | ||||||
|  | from opentracing.ext import tags | ||||||
|  | from opentracing.propagation import Format | ||||||
|  | from requests_opentracing import SessionTracing | ||||||
|  |  | ||||||
| import config | import config | ||||||
| import uuid | import uuid | ||||||
| from mqtt_helper import MQTT | from mqtt_helper import MQTT | ||||||
| @@ -48,6 +54,8 @@ def on_message_creator(mqtt_: MQTT): | |||||||
|     This generator is used, so that the mqtt object can be injected just when the callback is registered |     This generator is used, so that the mqtt object can be injected just when the callback is registered | ||||||
|     """ |     """ | ||||||
|  |  | ||||||
|  |     requests_session = SessionTracing(propagate=True) | ||||||
|  |  | ||||||
|     def on_message( |     def on_message( | ||||||
|             channel: pika.channel.Channel, |             channel: pika.channel.Channel, | ||||||
|             method: pika.spec.Basic.Deliver, |             method: pika.spec.Basic.Deliver, | ||||||
| @@ -61,41 +69,57 @@ def on_message_creator(mqtt_: MQTT): | |||||||
|             channel.basic_ack(delivery_tag=method.delivery_tag) |             channel.basic_ack(delivery_tag=method.delivery_tag) | ||||||
|             return |             return | ||||||
|  |  | ||||||
|         if ('probability' not in msg_json) or ('class' not in msg_json): |         span_ctx = opentracing.tracer.extract(Format.TEXT_MAP, msg_json) | ||||||
|             logging.error("Malformed message from classifier: Missing fields") |         span_tags = {tags.SPAN_KIND: tags.SPAN_KIND_CONSUMER} | ||||||
|             channel.basic_ack(delivery_tag=method.delivery_tag) |  | ||||||
|             return |  | ||||||
|  |  | ||||||
|         # TODO: strurnus should not be hardcoded here |         with opentracing.tracer.start_active_span( | ||||||
|         if (msg_json['class'] == 'sturnus') and (msg_json['probability'] > config.TRIGGER_LEVEL): |                 'handleMessage', finish_on_close=True, child_of=span_ctx, tags=span_tags | ||||||
|             try: |         ) as scope: | ||||||
|                 r = requests.get( |  | ||||||
|                     f"http://{config.INPUT_HOSTNAME}/sample/{msg_json['tag']}", |             if ('probability' not in msg_json) or ('class' not in msg_json): | ||||||
|                     timeout=config.INPUT_TIMEOUT |                 logging.error("Malformed message from classifier: Missing fields") | ||||||
|                 ) |                 channel.basic_ack(delivery_tag=method.delivery_tag) | ||||||
|             except requests.exceptions.Timeout: |  | ||||||
|                 logging.error(f"Input-service timed out! (Timeout: {config.INPUT_TIMEOUT} sec)") |  | ||||||
|                 channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) |  | ||||||
|                 return |                 return | ||||||
|  |  | ||||||
|             if r.status_code != 200: |             # TODO: strurnus should not be hardcoded here | ||||||
|                 logging.error(f"Input-service status code is not 200: {r.status_code}") |             if (msg_json['class'] == 'sturnus') and (msg_json['probability'] > config.TRIGGER_LEVEL): | ||||||
|                 channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) |                 scope.span.log_kv({'event': 'decisionMade', 'alerting': True}) | ||||||
|                 return |                 try: | ||||||
|  |                     r = requests_session.get( | ||||||
|  |                         f"http://{config.INPUT_HOSTNAME}/sample/{msg_json['tag']}", | ||||||
|  |                         timeout=config.INPUT_TIMEOUT | ||||||
|  |                     ) | ||||||
|  |                 except requests.exceptions.Timeout: | ||||||
|  |                     logging.error(f"Input-service timed out! (Timeout: {config.INPUT_TIMEOUT} sec)") | ||||||
|  |                     channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) | ||||||
|  |                     return | ||||||
|  |  | ||||||
|             if 'device_id' not in r.json(): |                 if r.status_code != 200: | ||||||
|                 logging.error("Input-service response invalid") |                     logging.error(f"Input-service status code is not 200: {r.status_code}") | ||||||
|                 channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) |                     channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) | ||||||
|                 return |                     return | ||||||
|  |  | ||||||
|             logging.info(f"Sending alert command to device {r.json()['device_id']}...") |                 if 'device_id' not in r.json(): | ||||||
|             mqtt_.publish( |                     logging.error("Input-service response invalid") | ||||||
|                 subtopic=r.json()['device_id'], |                     channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) | ||||||
|                 message=json.dumps({"command": "doAlert"}) |                     return | ||||||
|             ) |  | ||||||
|  |  | ||||||
|         else: |                 logging.info(f"Sending alert command to device {r.json()['device_id']}...") | ||||||
|             logging.debug(f"Probability is either bellow trigger level, or not the target class. Nothing to do.") |                 with opentracing.tracer.start_active_span( | ||||||
|  |                         'publishAlert', | ||||||
|  |                         tags={ | ||||||
|  |                             tags.SPAN_KIND: tags.SPAN_KIND_PRODUCER, | ||||||
|  |                             "device_id": r.json()['device_id'] | ||||||
|  |                         } | ||||||
|  |                 ): | ||||||
|  |                     mqtt_.publish( | ||||||
|  |                         subtopic=r.json()['device_id'], | ||||||
|  |                         message=json.dumps({"command": "doAlert"}) | ||||||
|  |                     ) | ||||||
|  |  | ||||||
|  |             else: | ||||||
|  |                 scope.span.log_kv({'event': 'decisionMade', 'alerting': False}) | ||||||
|  |                 logging.debug(f"Probability is either bellow trigger level, or not the target class. Nothing to do.") | ||||||
|  |  | ||||||
|         # This concludes the job |         # This concludes the job | ||||||
|         channel.basic_ack(delivery_tag=method.delivery_tag) |         channel.basic_ack(delivery_tag=method.delivery_tag) | ||||||
| @@ -123,6 +147,7 @@ def main(): | |||||||
|             environment=config.RELEASEMODE, |             environment=config.RELEASEMODE, | ||||||
|             _experiments={"auto_enabling_integrations": True} |             _experiments={"auto_enabling_integrations": True} | ||||||
|         ) |         ) | ||||||
|  |     jaeger_client.Config(config={}, service_name='guard-service', validate=True).initialize_tracer() | ||||||
|     logging.info("Guard service starting...") |     logging.info("Guard service starting...") | ||||||
|     mqtt = MQTT() |     mqtt = MQTT() | ||||||
|     mqtt.topic = config.MQTT_TOPIC |     mqtt.topic = config.MQTT_TOPIC | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user