import time import pycurl import itertools import argparse import multiprocessing import paho.mqtt.client from datetime import datetime from blist import blist from dataclasses import dataclass import threading import queue @dataclass(frozen=False, init=True) class UploadRequest: id: int upload_started: float = None upload_finished: float = None upload_status_code: bool = False alert_arrived: float = None @dataclass(frozen=False, init=True) class ReturnedRequest: id: int arrived: float def independent_worker(number_generator, result_queue, filename: str, timeout: float): prepared_curl = pycurl.Curl() prepared_curl.setopt(pycurl.URL, "https://birb.k8s.kmlabz.com/benchmark") prepared_curl.setopt(pycurl.SSL_VERIFYPEER, 0) prepared_curl.setopt(pycurl.SSL_VERIFYHOST, 0) # prepared_curl.setopt(pycurl.FORBID_REUSE, 0) prepared_curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2_0) prepared_curl.setopt(pycurl.WRITEFUNCTION, lambda _: None) completed_jobs_list = blist() # O(log n) insert instead of python's O(n) worker_stop_time = 0 worker_completed_job_count = 0 # Start working!! worker_start_time = time.time() while True: jobid = next(number_generator) prepared_curl.setopt(pycurl.HTTPPOST, [ ('file', ( pycurl.FORM_FILE, filename # Copying buffers from Python memory would be even slower... trust me )), ('description', ( pycurl.FORM_BUFFERPTR, f'{{"date": "{datetime.now().isoformat()}", "device_id" : "{jobid}"}}', pycurl.FORM_CONTENTTYPE, 'application/json', )) ]) job_start_time = time.time() prepared_curl.perform() job_stop_time = time.time() status_code = prepared_curl.getinfo(pycurl.HTTP_CODE) worker_completed_job_count += 1 completed_jobs_list.append(UploadRequest( id=jobid, upload_started=job_start_time, upload_finished=job_stop_time, upload_status_code=status_code )) if (job_stop_time - worker_start_time) >= timeout: worker_stop_time = job_stop_time break # end of loop runtime = worker_stop_time - worker_start_time prepared_curl.close() result_queue.put( (runtime, worker_completed_job_count, completed_jobs_list) ) def mqtt_on_connect(client, userdata, flags, rc): client.subscribe(f"command/#") def mqtt_on_command(client, userdata, message): # msg = json.loads(message.payload.decode()) # if msg['command'] == 'doAlert': _id = int(message.topic[8:]) # len('command/') = 8 userdata.put(ReturnedRequest(id=_id, arrived=time.time())) def mqtt_on_subscribe(client, userdata, mid, granted_qos): userdata.set() def run_benchmark(num_workers: int, timeout: float, filename: str): result_queue = multiprocessing.Queue() workers = [] number_gen = itertools.count() for _ in range(num_workers): workers.append(multiprocessing.Process( target=independent_worker, args=(number_gen, result_queue, filename, timeout) )) for w in workers: w.start() completed_workers = 0 all_requests_completed = {} while completed_workers < num_workers: results = result_queue.get() for result in results[2]: all_requests_completed[result.id] = result completed_workers += 1 for w in workers: w.join() return all_requests_completed def main(): parser = argparse.ArgumentParser( description='Birbnetes Benchmarker 3' ) parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers (or threads)') parser.add_argument('--timeout', type=int, required=False, default=10, help='Maximum time for request sending (0 for unlimited)') parser.add_argument('--file', type=str, required=True, help='Name of the sound file to upload') parser.add_argument('--target-url', type=str, required=False, default="https://birb.k8s.kmlabz.com/sample", help='The target endpoint') parser.add_argument('--inflight-timeout', type=int, required=False, default=30, help='Number of seconds to wait for MQTT messages to arrive after uploading finished (not very precise) (0 for not waiting)') parser.add_argument('--mqtt-username', type=str, required=False, default="birbnetes", help="Username for the MQTT server") parser.add_argument('--mqtt-port', type=int, required=False, default=1883, help="Port for the MQTT server") parser.add_argument('--mqtt-host', type=str, required=False, default="mqtt.kmlabz.com", help="Hostname for the MQTT server") parser.add_argument('--mqtt-password', type=str, required=False, default=None, help="Username for the MQTT server") args = parser.parse_args() print("Workers:", args.workers) print("Timeout:", args.timeout, "sec") alerts_arrived_queue = queue.Queue() print(f"Preparing MQTT (host: {args.mqtt_host}:{args.mqtt_port}, login: {bool(args.mqtt_password)})") subscription_waiter = threading.Event() # Preparing connection mqtt_client = paho.mqtt.client.Client(client_id="10") mqtt_client.on_connect = mqtt_on_connect mqtt_client.on_subscribe = mqtt_on_subscribe mqtt_client.user_data_set(subscription_waiter) if args.mqtt_password: mqtt_client.username_pw_set(args.mqtt_username, args.mqtt_password) # this is a blocking call, will wait until connecion is complete mqtt_client.connect(args.mqtt_host, args.mqtt_port, 60) # Start loop in background mqtt_client.loop_start() # Wait for subscription to be made subscription_waiter.wait() # Set final command reciever mqtt_client.user_data_set(alerts_arrived_queue) mqtt_client.on_message = mqtt_on_command print("MQTT Complete!") print("Running benchmark...") benchmark_results = run_benchmark(args.workers, args.timeout, args.file) print("Waiting for inflight MQTT messages to arrive...") # Wait for inflight messages for a little total_successful_uploads = len([req for req in benchmark_results.values() if req.upload_status_code == 200]) all_arrived = False waiting_started = time.time() for _ in range(int(args.inflight_timeout*1000)): if alerts_arrived_queue.qsize() >= total_successful_uploads: all_arrived = True break time.sleep(0.001) waited_for_inflight = time.time() - waiting_started print(f"Waited a total {waited_for_inflight} seconds") if not all_arrived: print("WARNING: Not all MQTT Messages arrived!") # Disconnectiong MQTT mqtt_client.disconnect() mqtt_client.loop_stop() # This stops further recieving MQTT messages # Aggregate results total_answered = 0 while True: try: returned_job = alerts_arrived_queue.get_nowait() except queue.Empty: break try: benchmark_results[returned_job.id].alert_arrived = returned_job.arrived except KeyError: print("Bruh moment: Alert arrived with an id that's not sent", returned_job.id) total_answered += 1 # print some mini statistics total_runtime = \ benchmark_results[max(benchmark_results.keys())].upload_finished - \ benchmark_results[min(benchmark_results.keys())].upload_started print( f"{len(benchmark_results)} requests completed: {total_successful_uploads} successfully uploaded and {total_answered} answered" ) print(f"Test total runtime was {total_runtime} seconds") print("HTTP Request/sec:", len(benchmark_results) / total_runtime) if __name__ == '__main__': main()