import time import sys import json import requests from datetime import datetime from dataclasses import dataclass import argparse from multiprocessing import Process, Queue from queue import Empty import paho.mqtt.client @dataclass(frozen=False, init=False) class UploadRequest: id: int upload_started: float upload_finished: float upload_success: bool = False def __init__(self, id: int): self.id = id @dataclass(frozen=False, init=True) class ReturnedRequest: id: int arrived: float def sender_worker(input_queue, sent_queue, target_url, file_contents): content_len = len(file_contents) session = requests.Session() try: while True: try: job = input_queue.get_nowait() except Empty: return files = { "file": ( f"birbbenchmark2_testfile_{job.id}.wav", file_contents, 'audio/wave', {'Content-length': content_len} ), "description": ( None, json.dumps({'date': datetime.now().isoformat(), 'device_id': str(job.id)}), "application/json" ) } # SUPER HIGH PRECISION MEASUREMENT COMPARTMENT DEPARTMENT ADMIRAL GENERAL ALADEEN job.upload_started = time.time() r = session.post(target_url, files=files) job.upload_finished = time.time() # /'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\ if r.status_code == 200: print('.', end='') job.upload_success = True else: print('!', end='') job.upload_success = False sys.stdout.flush() sent_queue.put(job) except KeyboardInterrupt: return def mqtt_on_connect(client, userdata, flags, rc): client.subscribe(f"command/#") def mqtt_on_command(client, userdata, message): msg = json.loads(message.payload.decode()) if msg['command'] == 'doAlert': _id = int(message.topic[len('command/'):]) userdata.put(ReturnedRequest(id=_id, arrived=time.time())) print(',', end='') sys.stdout.flush() def main(): parser = argparse.ArgumentParser(description='Birbnetes Benchmarker 2') parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers') parser.add_argument('--requests', type=int, required=False, default=100, help='Number of requests') parser.add_argument('--inflight-timeout', type=int, required=False, default=30, help='Number of seconds to wait for MQTT messages to arrive after uploading finished') parser.add_argument('--file', type=str, required=False, default="oIbBz.wav", help='Name of the sound file to upload') parser.add_argument('--target-url', type=str, required=False, default="https://birb.k8s.kmlabz.com/sample", help='The target endpoint') parser.add_argument('--mqtt-username', type=str, required=False, default="birbnetes", help="Username for the MQTT server") parser.add_argument('--mqtt-port', type=int, required=False, default=1883, help="Port for the MQTT server") parser.add_argument('--mqtt-host', type=str, required=False, default="mqtt.kmlabz.com", help="Hostname for the MQTT server") parser.add_argument('--mqtt-password', type=str, required=False, default=None, help="Username for the MQTT server") args = parser.parse_args() input_queue = Queue() sent_queue = Queue() returned_queue = Queue() sender_workers = [] print(f"Preparing {args.requests} requests...") for i in range(args.requests): input_queue.put(UploadRequest(i + 1)) print(f"Preparing MQTT (host: {args.mqtt_host}:{args.mqtt_port}, login: {bool(args.mqtt_password)})") client = paho.mqtt.client.Client(client_id="10") client.on_connect = mqtt_on_connect client.on_message = mqtt_on_command client.user_data_set(returned_queue) if args.mqtt_password: client.username_pw_set(args.mqtt_username, args.mqtt_password) client.connect(args.mqtt_host, args.mqtt_port, 60) client.loop_start() # Puts mqtt stuff to a background thread print(f"Preparing {args.workers} workers...") with open(args.file, "rb") as f: file_contents = f.read() for i in range(args.workers): sender_workers.append( Process(target=sender_worker, args=(input_queue, sent_queue, args.target_url, file_contents)) ) print("Baszatás!") for worker in sender_workers: worker.start() try: for worker in sender_workers: worker.join() except KeyboardInterrupt: # Interrupt forwarded for workers, so we just wait for them to quit for worker in sender_workers: worker.join() # read out all send requests to an array for easier handling: sent_requests = {} while True: try: job = sent_queue.get_nowait() except Empty: break sent_requests[job.id] = job total_successful_uploads = len([req for req in sent_requests.values() if req.upload_success]) # Wait for inflight messages for a little for _ in range(args.inflight_timeout): time.sleep(1) if returned_queue.qsize() >= total_successful_uploads: break client.loop_stop() # read out all recieved requests to an array for easier handling: recieved_alerts = {} while True: try: job = returned_queue.get_nowait() except Empty: break recieved_alerts[job.id] = job print() print("Eposzi kiértékelés idő...") print("id Upload Dur Return Dur Total Dur") for id_, req in sent_requests.items(): alert = recieved_alerts.get(id_) # may return zero if not alert: if req.upload_success: print(id_, req.upload_finished - req.upload_started, '???', '???') else: print(id_, req.upload_finished - req.upload_started, '!', '???', '???') else: print(id_, req.upload_finished - req.upload_started, alert.arrived - req.upload_finished, alert.arrived - req.upload_started) if __name__ == '__main__': main()