Compare commits

...

16 Commits

Author SHA1 Message Date
7fc5861468 upload to docker hub
All checks were successful
continuous-integration/drone/push Build is passing
2022-01-31 23:14:29 +01:00
703947e25f Fixed sample service path
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-19 04:03:48 +02:00
de7a325398 updated k8s stuff
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-19 03:55:36 +02:00
85fed2882e change input-svc path
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-16 15:27:01 +02:00
90fa74ca91 Added tracing
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-10 16:44:01 +02:00
c26aa1be43 some updates
All checks were successful
continuous-integration/drone/push Build is passing
2021-08-10 16:15:20 +02:00
e6afdd5bac Added nacking
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-28 14:17:45 +02:00
c6b444bc6d Merge branch 'master' of ssh://git.kmlabz.com:2222/birbnetes/guard-service
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-28 14:11:57 +02:00
87cfdecc2d less prone to crashing 2021-07-28 14:10:59 +02:00
22054cf4d0 remove legacy stuff
All checks were successful
continuous-integration/drone/push Build is passing
2021-07-28 13:37:26 +02:00
fbfb379e86 Added trigger level
Some checks failed
continuous-integration/drone/push Build is failing
2021-07-28 13:35:40 +02:00
5d83767d83 Updated decision making
All checks were successful
continuous-integration/drone/push Build is passing
2021-06-14 03:54:29 +02:00
1f7f977e75 Fixed potential crashes 2021-06-14 03:51:24 +02:00
a3d3b27817 increate timeouts
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-23 20:50:20 +02:00
a97d081bff Merge branch 'master' of ssh://cloud.tormakristof.eu:2222/birbnetes/guard-service into master
All checks were successful
continuous-integration/drone/push Build is passing
2020-10-20 01:13:08 +02:00
a23b2e537f my hart is beating 2020-10-20 01:12:56 +02:00
6 changed files with 177 additions and 75 deletions

View File

@ -3,16 +3,6 @@ type: docker
name: default name: default
steps: steps:
- name: static_analysis
image: "python:3.8"
commands:
- pip3 install --cache-dir='./.pipcache' pylint bandit mccabe
- pip3 install --cache-dir='./.pipcache' -r requirements.txt
- find . -name "*.py" -exec python3 -m py_compile '{}' \;
- find . -name "*.py" -exec pylint '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi
- find . -name "*.py" -exec python3 -m mccabe --min 3 '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi
- bandit -r . + || if [ $? -eq 1 ]; then echo "you fail"; fi
- name: code-analysis - name: code-analysis
image: aosapps/drone-sonar-plugin image: aosapps/drone-sonar-plugin
settings: settings:
@ -34,13 +24,17 @@ steps:
- latest - latest
- ${DRONE_BUILD_NUMBER} - ${DRONE_BUILD_NUMBER}
- name: sentry - name: dockerhub
image: tormachris/drone-sentry image: plugins/docker
settings: settings:
sentry_project: ${DRONE_REPO_NAME} repo: birbnetes/${DRONE_REPO_NAME}
sentry_domain: sentry.kmlabz.com username:
sentry_token: from_secret: DOCKERHUB_USER
from_secret: SENTRY_TOKEN password:
from_secret: DOCKERHUB_PASSWORD
tags:
- latest
- ${DRONE_BUILD_NUMBER}
- name: ms-teams - name: ms-teams
image: kuperiu/drone-teams image: kuperiu/drone-teams

View File

@ -6,17 +6,17 @@ metadata:
app: guard-service app: guard-service
namespace: birbnetes namespace: birbnetes
data: data:
SENTRY_DSN: "https://80d27db8c74f4556a19a1bf0180b373f@sentry.kmlabz.com/23"
RELEASE_ID: birb-k8s
GUARD_CLIENT_ID: guard-b50d97f6-29f9-4de5-a96e-f9b69ca69f7f GUARD_CLIENT_ID: guard-b50d97f6-29f9-4de5-a96e-f9b69ca69f7f
GUARD_SERVICE_RELEASEMODE: release GUARD_MQTT_HOSTNAME: activemq
GUARD_MQTT_PASSWORD: de4d2182
GUARD_MQTT_PORT: "1883"
GUARD_MQTT_TOPIC: command
GUARD_MQTT_USERNAME: birbnetes
GUARD_RABBITMQ_EXCHANGE: output
GUARD_RABBITMQ_HOSTNAME: birb-rabbitmq GUARD_RABBITMQ_HOSTNAME: birb-rabbitmq
GUARD_RABBITMQ_EXCHANGE: "sample" GUARD_RABBITMQ_PASSWORD: ZgCiSiSO8t
GUARD_RABBITMQ_USERNAME: user GUARD_RABBITMQ_USERNAME: user
GUARD_RABBITMQ_PASSWORD: 1wZVQnP5vy GUARD_SERVICE_RELEASEMODE: release
GUARD_MQTT_HOSTNAME: guard-postgres SAMPLE_SVC_HOSTNAME: sample-service
GUARD_MQTT_PORT: 1883 RELEASE_ID: kmlabz-k8s
GUARD_MQTT_USERNAME: guard-service SENTRY_DSN: https://1d01460ec3094d5bb6c4d78c0a028b08@glitchtip.kmlabz.com/7
GUARD_MQTT_PASSWORD: guard-service-supersecret
GUARD_MQTT_TOPIC: guard-service
INPUT_SVC_HOSTNAME: input-service

View File

@ -24,5 +24,26 @@ spec:
envFrom: envFrom:
- configMapRef: - configMapRef:
name: guard-service name: guard-service
- name: jaeger-agent
image: jaegertracing/jaeger-agent:latest
imagePullPolicy: IfNotPresent
ports:
- containerPort: 5775
name: zk-compact-trft
protocol: UDP
- containerPort: 5778
name: config-rest
protocol: TCP
- containerPort: 6831
name: jg-compact-trft
protocol: UDP
- containerPort: 6832
name: jg-binary-trft
protocol: UDP
- containerPort: 14271
name: admin-http
protocol: TCP
args:
- --reporter.grpc.host-port=dns:///woolsey.tormakristof.eu:14250
imagePullSecrets: imagePullSecrets:
- name: regcred - name: regcred

View File

@ -2,3 +2,7 @@ sentry_sdk
pika pika
requests requests
paho-mqtt paho-mqtt
opentracing~=2.4.0
jaeger-client
requests-opentracing

View File

@ -8,6 +8,12 @@ import pika
import requests import requests
from sentry_sdk.integrations.logging import LoggingIntegration from sentry_sdk.integrations.logging import LoggingIntegration
import jaeger_client
import opentracing
from opentracing.ext import tags
from opentracing.propagation import Format
from requests_opentracing import SessionTracing
import config import config
import uuid import uuid
from mqtt_helper import MQTT from mqtt_helper import MQTT
@ -21,38 +27,26 @@ __copyright__ = "Copyright 2020, Birbnetes Team"
__module_name__ = "app" __module_name__ = "app"
__version__text__ = "1" __version__text__ = "1"
if config.SENTRY_DSN:
sentry_logging = LoggingIntegration(
level=logging.DEBUG, # Capture info and above as breadcrumbs
event_level=logging.ERROR # Send errors as events
)
sentry_sdk.init(
dsn=config.SENTRY_DSN,
send_default_pii=True,
integrations=[sentry_logging],
traces_sample_rate=1.0,
release=config.RELEASE_ID,
environment=config.RELEASEMODE,
_experiments={"auto_enabling_integrations": True}
)
def setup_rabbit(mqtt_: MQTT) -> None: def setup_rabbit(mqtt_: MQTT) -> None:
logging.info("Connecting to RabbitMQ") logging.info("Connecting to RabbitMQ...")
credentials = pika.PlainCredentials(config.RABBIT_USERNAME, config.RABBIT_PASSWORD) credentials = pika.PlainCredentials(config.RABBIT_USERNAME, config.RABBIT_PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.RABBIT_HOSTNAME, while True:
credentials=credentials, connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.RABBIT_HOSTNAME,
heartbeat=0, credentials=credentials,
socket_timeout=5)) heartbeat=30,
channel = connection.channel() socket_timeout=45))
channel.exchange_declare(exchange=config.RABBIT_EXCHANGE, channel = connection.channel()
exchange_type='fanout') channel.exchange_declare(exchange=config.RABBIT_EXCHANGE, exchange_type='fanout')
queue = channel.queue_declare(durable=True, auto_delete=True, queue=uuid.uuid4().urn.split(':')[2], queue = channel.queue_declare(durable=True, auto_delete=True, queue=uuid.uuid4().urn.split(':')[2],
exclusive=True).method.queue exclusive=True).method.queue
channel.queue_bind(exchange=config.RABBIT_EXCHANGE, queue=queue) channel.queue_bind(exchange=config.RABBIT_EXCHANGE, queue=queue)
channel.basic_consume(queue=queue, on_message_callback=on_message_creator(mqtt_), auto_ack=True) channel.basic_consume(queue=queue, on_message_callback=on_message_creator(mqtt_), auto_ack=False)
logging.debug("Starting consumption") logging.debug("Starting consumption...")
channel.start_consuming() try:
channel.start_consuming() # this automagically responds to heartbeats
except pika.exceptions.AMQPConnectionError as e:
logging.warning(f"AMQP Error happened: {e}; Reconnecting...")
def on_message_creator(mqtt_: MQTT): def on_message_creator(mqtt_: MQTT):
@ -60,28 +54,107 @@ def on_message_creator(mqtt_: MQTT):
This generator is used, so that the mqtt object can be injected just when the callback is registered This generator is used, so that the mqtt object can be injected just when the callback is registered
""" """
def on_message(channel, method_frame, header_frame, body): requests_session = SessionTracing(propagate=True)
msg_json = json.loads(body)
if 'probability' not in msg_json: def on_message(
logging.error("Malformed message from broker") channel: pika.channel.Channel,
if msg_json['probability'] > 0.5: method: pika.spec.Basic.Deliver,
r = requests.get(f"http://{config.INPUT_HOSTNAME}/sample/{msg_json['tag']}") properties: pika.spec.BasicProperties,
r.raise_for_status() body: bytes
if 'device_id' not in r.json(): ):
logging.error("Input-service response invalid") try:
logging.info(f"Sending alert command to device {r.json()['device_id']}") msg_json = json.loads(body)
mqtt_.publish(subtopic=r.json()['device_id'], except (json.JSONDecodeError, UnicodeDecodeError) as e:
message=json.dumps({"command": "doAlert"})) logging.error(f"Malformed message from classifier: {e}")
channel.basic_ack(delivery_tag=method.delivery_tag)
return
span_ctx = opentracing.tracer.extract(Format.TEXT_MAP, msg_json)
span_tags = {tags.SPAN_KIND: tags.SPAN_KIND_CONSUMER}
with opentracing.tracer.start_active_span(
'handleMessage', finish_on_close=True, child_of=span_ctx, tags=span_tags
) as scope:
if ('probability' not in msg_json) or ('class' not in msg_json):
logging.error("Malformed message from classifier: Missing fields")
channel.basic_ack(delivery_tag=method.delivery_tag)
return
# TODO: strurnus should not be hardcoded here
if (msg_json['class'] == 'sturnus') and (msg_json['probability'] > config.TRIGGER_LEVEL):
scope.span.log_kv({'event': 'decisionMade', 'alerting': True})
try:
r = requests_session.get(
f"http://{config.SAMPLE_SVC_HOSTNAME}/sample/{msg_json['tag']}",
timeout=config.INPUT_TIMEOUT
)
except requests.exceptions.Timeout:
logging.error(f"Input-service timed out! (Timeout: {config.INPUT_TIMEOUT} sec)")
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
return
if r.status_code != 200:
logging.error(f"Input-service status code is not 200: {r.status_code}")
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
return
if 'device_id' not in r.json():
logging.error("Input-service response invalid")
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
return
logging.info(f"Sending alert command to device {r.json()['device_id']}...")
with opentracing.tracer.start_active_span(
'publishAlert',
tags={
tags.SPAN_KIND: tags.SPAN_KIND_PRODUCER,
"device_id": r.json()['device_id']
}
):
mqtt_.publish(
subtopic=r.json()['device_id'],
message=json.dumps({"command": "doAlert"})
)
else:
scope.span.log_kv({'event': 'decisionMade', 'alerting': False})
logging.debug(f"Probability is either bellow trigger level, or not the target class. Nothing to do.")
# This concludes the job
channel.basic_ack(delivery_tag=method.delivery_tag)
return on_message return on_message
if __name__ == "__main__": def main():
logging.basicConfig(stream=sys.stdout, format="%(asctime)s - %(name)s [%(levelname)s]: %(message)s", logging.basicConfig(
level=logging.DEBUG if '--debug' in sys.argv else logging.INFO) stream=sys.stdout,
logging.info("Guard service starting") format="%(asctime)s - %(name)s [%(levelname)s]: %(message)s",
level=config.LOG_LEVEL
)
if config.SENTRY_DSN:
sentry_logging = LoggingIntegration(
level=logging.DEBUG, # Capture info and above as breadcrumbs
event_level=logging.ERROR # Send errors as events
)
sentry_sdk.init(
dsn=config.SENTRY_DSN,
send_default_pii=True,
integrations=[sentry_logging],
traces_sample_rate=0.0,
release=config.RELEASE_ID,
environment=config.RELEASEMODE,
_experiments={"auto_enabling_integrations": True}
)
jaeger_client.Config(config={}, service_name='guard-service', validate=True).initialize_tracer()
logging.info("Guard service starting...")
mqtt = MQTT() mqtt = MQTT()
mqtt.topic = config.MQTT_TOPIC mqtt.topic = config.MQTT_TOPIC
mqtt.connect() mqtt.connect()
mqtt.client.loop_start() # Start MQTT event loop on a different thread mqtt.client.loop_start() # Start MQTT event loop on a different thread
setup_rabbit(mqtt) setup_rabbit(mqtt)
if __name__ == "__main__":
main()

View File

@ -1,5 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os import os
import sys
import logging
""" """
@ -29,4 +31,12 @@ MQTT_USERNAME = os.getenv("GUARD_MQTT_USERNAME", "guard-service")
MQTT_PASSWORD = os.getenv("GUARD_MQTT_PASSWORD", "guard-service") MQTT_PASSWORD = os.getenv("GUARD_MQTT_PASSWORD", "guard-service")
MQTT_TOPIC = os.getenv("GUARD_MQTT_TOPIC", "guard-service") MQTT_TOPIC = os.getenv("GUARD_MQTT_TOPIC", "guard-service")
INPUT_HOSTNAME = os.getenv("INPUT_SVC_HOSTNAME", "input-service") SAMPLE_SVC_HOSTNAME = os.getenv("SAMPLE_SVC_HOSTNAME", "input-service")
INPUT_TIMEOUT = int(os.environ.get("INPUT_SVC_TIMEOUT", 5))
TRIGGER_LEVEL = float(os.environ.get("TRIGGER_LEVEL", 0.51))
LOG_LEVEL = logging.DEBUG if (
'--debug' in sys.argv
) or (
os.environ.get('DEBUG', '0').lower() in ['yes', 'true', '1']
) else logging.INFO