diff --git a/benchmark.py b/benchmark.py index 89bed1f..b04f676 100644 --- a/benchmark.py +++ b/benchmark.py @@ -5,7 +5,7 @@ import os import sys import time import json -import datetime +from datetime import datetime import sentry_sdk from sentry_sdk.integrations.logging import LoggingIntegration import paho.mqtt.client @@ -26,10 +26,10 @@ RELEASEMODE = os.environ.get("RELEASEMODE", "dev") DEVICE_ID = os.environ.get("DEVICE_ID", "devraspi") -MQTT_USERNAME = "" -MQTT_PASSWORD = "" -MQTT_HOSTNAME = "" -MQTT_PORT = "" +MQTT_USERNAME = "birbnetes" +MQTT_PASSWORD = "de4d2182" +MQTT_HOSTNAME = "mqtt.kmlabz.com" +MQTT_PORT = 1883 STARTTIME = time.time() ENDTIME = time.time() @@ -55,19 +55,20 @@ if SENTRY_DSN: def mqtt_on_connect(client, userdata, flags, rc): - client.subscribe(f"command/benchmark-script") + global STARTTIME + client.subscribe(f"command/10") logging.info("Sending Message") files = { "file": ( os.path.basename(FILE), open(FILE, 'rb').read(), 'audio/wave', {'Content-length': os.path.getsize(FILE)}), - "description": (None, json.dumps({'date': datetime.now().isoformat(), 'device_id': '1'}), "application/json") + "description": (None, json.dumps({'date': datetime.now().isoformat(), 'device_id': '10'}), "application/json") } requests.post(URL, files=files) - nonlocal STARTTIME STARTTIME = time.time() def mqtt_on_command(client, userdata, message): + global ENDTIME try: msg = json.loads(message.payload.decode()) except (UnicodeError, json.JSONDecodeError) as e: @@ -75,7 +76,6 @@ def mqtt_on_command(client, userdata, message): return if msg.get("command") == 'doAlert': - nonlocal ENDTIME ENDTIME = time.time() elapsed = ENDTIME - STARTTIME logging.info(f"Elapsed time: {elapsed}") @@ -89,7 +89,7 @@ def main() -> None: logging.basicConfig(stream=sys.stdout, format="%(asctime)s - %(name)s [%(levelname)s]: %(message)s", level=logging.DEBUG if '--debug' in sys.argv else logging.INFO) - client = paho.mqtt.client.Client(client_id="benchmark-script") + client = paho.mqtt.client.Client(client_id="10") client.on_connect = mqtt_on_connect client.on_message = mqtt_on_command diff --git a/benchmark2.py b/benchmark2.py new file mode 100644 index 0000000..2ecfecf --- /dev/null +++ b/benchmark2.py @@ -0,0 +1,197 @@ +import time +import sys +import json +import requests +from datetime import datetime +from dataclasses import dataclass +import argparse +from multiprocessing import Process, Queue +from queue import Empty +import paho.mqtt.client + + +@dataclass(frozen=False, init=False) +class UploadRequest: + id: int + upload_started: float + upload_finished: float + upload_success: bool = False + + def __init__(self, id: int): + self.id = id + + +@dataclass(frozen=False, init=True) +class ReturnedRequest: + id: int + arrived: float + + +def sender_worker(input_queue, sent_queue, target_url, file_contents): + content_len = len(file_contents) + session = requests.Session() + try: + while True: + try: + job = input_queue.get_nowait() + except Empty: + return + + files = { + "file": ( + f"birbbenchmark2_testfile_{job.id}.wav", + file_contents, + 'audio/wave', + {'Content-length': content_len} + ), + "description": ( + None, + json.dumps({'date': datetime.now().isoformat(), 'device_id': str(job.id)}), + "application/json" + ) + } + + # SUPER HIGH PRECISION MEASUREMENT COMPARTMENT DEPARTMENT ADMIRAL GENERAL ALADEEN + job.upload_started = time.time() + r = session.post(target_url, files=files) + job.upload_finished = time.time() + # /'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\ + + if r.status_code == 200: + print('.', end='') + job.upload_success = True + else: + print('!', end='') + job.upload_success = False + + sys.stdout.flush() + sent_queue.put(job) + + except KeyboardInterrupt: + return + + +def mqtt_on_connect(client, userdata, flags, rc): + client.subscribe(f"command/#") + + +def mqtt_on_command(client, userdata, message): + msg = json.loads(message.payload.decode()) + if msg['command'] == 'doAlert': + _id = int(message.topic[len('command/'):]) + userdata.put(ReturnedRequest(id=_id, arrived=time.time())) + print(',', end='') + sys.stdout.flush() + + +def main(): + parser = argparse.ArgumentParser(description='Birbnetes Benchmarker 2') + parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers') + parser.add_argument('--requests', type=int, required=False, default=100, help='Number of requests') + parser.add_argument('--inflight-timeout', type=int, required=False, default=30, + help='Number of seconds to wait for MQTT messages to arrive after uploading finished') + + parser.add_argument('--file', type=str, required=False, default="oIbBz.wav", + help='Name of the sound file to upload') + + parser.add_argument('--target-url', type=str, required=False, default="https://birb.k8s.kmlabz.com/sample", + help='The target endpoint') + + parser.add_argument('--mqtt-username', type=str, required=False, default="birbnetes", + help="Username for the MQTT server") + parser.add_argument('--mqtt-port', type=int, required=False, default=1883, help="Port for the MQTT server") + parser.add_argument('--mqtt-host', type=str, required=False, default="mqtt.kmlabz.com", + help="Hostname for the MQTT server") + parser.add_argument('--mqtt-password', type=str, required=False, default=None, help="Username for the MQTT server") + + args = parser.parse_args() + + input_queue = Queue() + sent_queue = Queue() + returned_queue = Queue() + sender_workers = [] + + print(f"Preparing {args.requests} requests...") + for i in range(args.requests): + input_queue.put(UploadRequest(i + 1)) + + print(f"Preparing MQTT (host: {args.mqtt_host}:{args.mqtt_port}, login: {bool(args.mqtt_password)})") + client = paho.mqtt.client.Client(client_id="10") + client.on_connect = mqtt_on_connect + client.on_message = mqtt_on_command + client.user_data_set(returned_queue) + + if args.mqtt_password: + client.username_pw_set(args.mqtt_username, args.mqtt_password) + + client.connect(args.mqtt_host, args.mqtt_port, 60) + client.loop_start() # Puts mqtt stuff to a background thread + + print(f"Preparing {args.workers} workers...") + + with open(args.file, "rb") as f: + file_contents = f.read() + + for i in range(args.workers): + sender_workers.append( + Process(target=sender_worker, args=(input_queue, sent_queue, args.target_url, file_contents)) + ) + + print("Baszatás!") + for worker in sender_workers: + worker.start() + + try: + for worker in sender_workers: + worker.join() + except KeyboardInterrupt: + # Interrupt forwarded for workers, so we just wait for them to quit + for worker in sender_workers: + worker.join() + + # read out all send requests to an array for easier handling: + sent_requests = {} + while True: + try: + job = sent_queue.get_nowait() + except Empty: + break + sent_requests[job.id] = job + + total_successful_uploads = len([req for req in sent_requests.values() if req.upload_success]) + + # Wait for inflight messages for a little + for _ in range(args.inflight_timeout): + time.sleep(1) + if returned_queue.qsize() >= total_successful_uploads: + break + + client.loop_stop() + + # read out all recieved requests to an array for easier handling: + recieved_alerts = {} + while True: + try: + job = returned_queue.get_nowait() + except Empty: + break + recieved_alerts[job.id] = job + + print() + print("Eposzi kiértékelés idő...") + print("id Upload Dur Return Dur Total Dur") + for id_, req in sent_requests.items(): + alert = recieved_alerts.get(id_) # may return zero + + if not alert: + if req.upload_success: + print(id_, req.upload_finished - req.upload_started, '???', '???') + else: + print(id_, req.upload_finished - req.upload_started, '!', '???', '???') + else: + print(id_, req.upload_finished - req.upload_started, alert.arrived - req.upload_finished, + alert.arrived - req.upload_started) + + +if __name__ == '__main__': + main()