import os import signal import time import sys import json import requests import textwrap from datetime import datetime from dataclasses import dataclass import argparse from multiprocessing import Process, Queue from queue import Empty import paho.mqtt.client from threading import Thread, current_thread, Timer @dataclass(frozen=False, init=False) class UploadRequest: id: int upload_started: float = None upload_finished: float = None upload_status_code: bool = False alert_arrived: float = None def __init__(self, _id: int): self.id = _id @dataclass(frozen=False, init=True) class ReturnedRequest: id: int arrived: float def print_progress_meme(char: str): print(char, end='') sys.stdout.flush() def sender_worker(input_queue: Queue, sent_queue: Queue, target_url: str, file_contents: bytes, request_timeout: float): content_len = len(file_contents) session = requests.Session() try: while True: try: job = input_queue.get_nowait() except Empty: return files = { "file": ( f"birbbenchmark2_testfile_{job.id}.wav", file_contents, 'audio/wave', {'Content-length': content_len} ), "description": ( None, json.dumps({'date': datetime.now().isoformat(), 'device_id': str(job.id)}), "application/json" ) } # SUPER HIGH PRECISION MEASUREMENT COMPARTMENT DEPARTMENT ADMIRAL GENERAL ALADEEN job.upload_started = time.time() try: r = session.post(target_url, files=files, timeout=request_timeout) job.upload_finished = time.time() if r.status_code == 200: print_progress_meme('.') else: print_progress_meme('!') job.upload_status_code = r.status_code except requests.Timeout: print_progress_meme('?') # /'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\ sent_queue.put(job) # Will have none as status_code and upload_finished if the request timed out except KeyboardInterrupt: return def mqtt_on_connect(client, userdata, flags, rc): client.subscribe(f"command/#") def mqtt_on_command(client, userdata, message): # msg = json.loads(message.payload.decode()) # if msg['command'] == 'doAlert': _id = int(message.topic[8:]) # len('command/') = 8 userdata.put(ReturnedRequest(id=_id, arrived=time.time())) print_progress_meme(',') def infinite_job_generator(input_queue: Queue, target_ready_jobs: int): i = 0 target_ready_jobs_local = target_ready_jobs first = True while True: jobs_ready = input_queue.qsize() if jobs_ready < target_ready_jobs_local: if not first: # In the first run there are exactly zero jobs ready, so if jobs_ready < (target_ready_jobs_local / 2): target_ready_jobs_local *= 2 else: first = False jobs_to_generate = target_ready_jobs_local - jobs_ready for _ in range(jobs_to_generate): i += 1 input_queue.put(UploadRequest(i)) time.sleep(0.02) if not current_thread().active: return def greacefully_stop_all_workers(list_of_worker_pids: list): for pid in list_of_worker_pids: os.kill(pid, signal.SIGINT) print_progress_meme('#') def run_benchmark( num_requests: int, num_workers: int, sound_file_path: str, target_url: str, timeout: float, inflight_timeout: float, request_timeout: float, mqtt_client: paho.mqtt.client.Client): input_queue = Queue() sent_queue = Queue() returned_queue = Queue() sender_workers = [] input_generator_thread = None if num_requests == 0: print(f"Unlimited amount of requests will be generated...") input_generator_thread = Thread(target=infinite_job_generator, args=(input_queue, num_workers * 100)) input_generator_thread.active = True input_generator_thread.start() else: print(f"Preparing {num_requests} requests...") for i in range(num_requests): input_queue.put(UploadRequest(i + 1)) mqtt_client.user_data_set(returned_queue) mqtt_client.loop_start() # Puts mqtt stuff to a background thread print(f"Preparing {num_workers} workers...") with open(sound_file_path, "rb") as f: file_contents = f.read() for i in range(num_workers): sender_workers.append( Process(target=sender_worker, args=( input_queue, sent_queue, target_url, file_contents, request_timeout )) ) print("Baszatás!") # Starting all workers for worker in sender_workers: worker.start() worker_stopper_timer = None if timeout != 0: sender_worker_pids = [w.pid for w in sender_workers] worker_stopper_timer = Timer(timeout, greacefully_stop_all_workers, args=(sender_worker_pids,)) worker_stopper_timer.start() # Waiting for workers to finish try: for worker in sender_workers: worker.join() except KeyboardInterrupt: if worker_stopper_timer: worker_stopper_timer.cancel() # We don't want the auto stopper to fire again # Azt jelenti: jovan, megállítottam a küldést, most már csak az elveszett válaszokat várom print_progress_meme('|') # Interrupt forwarded for workers, so we just wait for them to quit for worker in sender_workers: if worker.is_alive(): exitcode = worker.join(timeout=2) if not exitcode: # = it did not exit worker.terminate() # Not the nicest solution # Stop the input generator input_generator_thread.active = False # read out all send requests to an array for handling later: results = {} total_successful_uploads = 0 while True: try: job = sent_queue.get_nowait() except Empty: break if job.upload_status_code == 200: total_successful_uploads += 1 results[job.id] = job # total_successful_uploads = len([req for req in results.values() if req.upload_status_code == 200]) # Wait for inflight messages for a little for _ in range(int(inflight_timeout)): time.sleep(1) if returned_queue.qsize() >= total_successful_uploads: break mqtt_client.loop_stop() # This stops further recieving MQTT messages print() # Everything that might print from other threads are now stopped, so we put a newline if int(inflight_timeout) != 0: # print only if there was a timeout if returned_queue.qsize() < total_successful_uploads: print("Waiting for inflight MQTT messages timed out!") # Fill out the missing fields in the reuslts total_answered = 0 while True: try: returned_job = returned_queue.get_nowait() except Empty: break try: results[returned_job.id].alert_arrived = returned_job.arrived except KeyError: print("Bruh moment: Alert arrived with an id that's not sent", returned_job.id) total_answered += 1 # mini statistics print( f"{len(results)} requests completed: {total_successful_uploads} successfully uploaded and {total_answered} answered") return results def main(): help_epilog = textwrap.dedent('''\ Progress chars meaning: HTTP related . -> Upload succesfully completed (return code was 200) ! -> Upload failed (return code was not 200) ? -> Upload timed out (see --request-timeout) MQTT related , -> Alert arrived (Trough MQTT) Process related | -> User interrupted (CTRL+C pressed) # -> Timeout expired (see --timeout) Examples: One worker send exactly 100 requests (wait 10 secs for all alerts to arrive) benchmark2.py --workers 1 --requests 100 --file /home/testerboi/birbstuff/testdata/oIbBz.wav --inflight-timeout 10 Ten worker run for exactly one minute (wait 10 secs for all alerts to arrive) benchmark2.py --workers 10 --requests 0 --timeout 60 --file /home/testerboi/birbstuff/testdata/oIbBz.wav --inflight-timeout 10 Send either 100 requests or run for one minute (whichever comes first) benchmark2.py --requests 100 --timeout 60 --file /home/testerboi/birbstuff/testdata/oIbBz.wav ''') parser = argparse.ArgumentParser(description='Birbnetes Benchmarker 2', epilog=help_epilog, formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers') parser.add_argument('--requests', type=int, required=True, help='Number of requests (0 for infinity)') parser.add_argument('--timeout', type=int, required=False, default=0, help='Maximum time for request sending (0 for unlimited)') parser.add_argument('--inflight-timeout', type=int, required=False, default=30, help='Number of seconds to wait for MQTT messages to arrive after uploading finished (not very precise) (0 for not waiting)') parser.add_argument('--request-timeout', type=float, required=False, default=5, help='Timeout for HTTP requests') parser.add_argument('--file', type=str, required=True, help='Name of the sound file to upload') parser.add_argument('--target-url', type=str, required=False, default="https://birb.k8s.kmlabz.com/sample", help='The target endpoint') parser.add_argument('--mqtt-username', type=str, required=False, default="birbnetes", help="Username for the MQTT server") parser.add_argument('--mqtt-port', type=int, required=False, default=1883, help="Port for the MQTT server") parser.add_argument('--mqtt-host', type=str, required=False, default="mqtt.kmlabz.com", help="Hostname for the MQTT server") parser.add_argument('--mqtt-password', type=str, required=False, default=None, help="Username for the MQTT server") args = parser.parse_args() print(f"Preparing MQTT (host: {args.mqtt_host}:{args.mqtt_port}, login: {bool(args.mqtt_password)})") client = paho.mqtt.client.Client(client_id="10") client.on_connect = mqtt_on_connect client.on_message = mqtt_on_command if args.mqtt_password: client.username_pw_set(args.mqtt_username, args.mqtt_password) client.connect(args.mqtt_host, args.mqtt_port, 60) results = run_benchmark(args.requests, args.workers, args.file, args.target_url, args.timeout, args.inflight_timeout, args.request_timeout, client) print("Eposzi kiértékelés idő...") print("id Upload Dur Return Dur Total Dur") # read out all recieved requests to an array for easier handling: for id_, req in results.items(): if not req.alert_arrived: if req.upload_status_code == 200: print(id_, req.upload_finished - req.upload_started, '???', '???') else: print(id_, req.upload_finished - req.upload_started, '!', '???', '???') else: print(id_, req.upload_finished - req.upload_started, req.alert_arrived - req.upload_finished, req.alert_arrived - req.upload_started) if __name__ == '__main__': main()