from typing import Optional import time import pycurl import itertools import argparse import multiprocessing import paho.mqtt.client from datetime import datetime from blist import blist from dataclasses import dataclass import threading import queue import sys import csv import random import textwrap @dataclass(frozen=False, init=True) class UploadRequest: id: int upload_started: float = None upload_finished: float = None upload_status_code: bool = False alert_arrived: float = None @dataclass(frozen=False, init=True) class ReturnedRequest: id: int arrived: float def independent_worker(result_queue, filename: str, timeout: float): prepared_curl = pycurl.Curl() prepared_curl.setopt(pycurl.URL, "https://birb.k8s.kmlabz.com/benchmark") prepared_curl.setopt(pycurl.SSL_VERIFYPEER, 0) prepared_curl.setopt(pycurl.SSL_VERIFYHOST, 0) # prepared_curl.setopt(pycurl.FORBID_REUSE, 0) prepared_curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2_0) prepared_curl.setopt(pycurl.WRITEFUNCTION, lambda _: None) completed_jobs_list = blist() # O(log n) insert instead of python's O(n) worker_stop_time = 0 worker_completed_job_count = 0 # Start working!! worker_start_time = time.time() while True: jobid = random.randint(0, 2147483647) prepared_curl.setopt(pycurl.HTTPPOST, [ ('file', ( pycurl.FORM_FILE, filename # Copying buffers from Python memory would be even slower... trust me )), ('description', ( pycurl.FORM_BUFFERPTR, f'{{"date": "{datetime.now().isoformat()}", "device_id" : "{jobid}"}}', pycurl.FORM_CONTENTTYPE, 'application/json', )) ]) job_start_time = time.time() prepared_curl.perform() job_stop_time = time.time() status_code = prepared_curl.getinfo(pycurl.HTTP_CODE) worker_completed_job_count += 1 completed_jobs_list.append(UploadRequest( id=jobid, upload_started=job_start_time, upload_finished=job_stop_time, upload_status_code=status_code )) if (job_stop_time - worker_start_time) >= timeout: worker_stop_time = job_stop_time break # end of loop runtime = worker_stop_time - worker_start_time prepared_curl.close() result_queue.put( (runtime, worker_completed_job_count, completed_jobs_list) ) def mqtt_on_connect(client, userdata, flags, rc): client.subscribe(f"command/#") def mqtt_on_command(client, userdata, message): # msg = json.loads(message.payload.decode()) # if msg['command'] == 'doAlert': _id = int(message.topic[8:]) # len('command/') = 8 userdata.put(ReturnedRequest(id=_id, arrived=time.time())) def mqtt_on_subscribe(client, userdata, mid, granted_qos): userdata.set() def run_benchmark(num_workers: int, timeout: float, filename: str): result_queue = multiprocessing.Queue() workers = [] for _ in range(num_workers): workers.append(multiprocessing.Process( target=independent_worker, args=(result_queue, filename, timeout) )) for w in workers: w.start() completed_workers = 0 all_requests_completed = {} count_requests_completed = 0 while completed_workers < num_workers: results = result_queue.get() count_requests_completed += results[1] for result in results[2]: all_requests_completed[result.id] = result completed_workers += 1 for w in workers: w.join() assert count_requests_completed == len(all_requests_completed) final_sorted_results = list(all_requests_completed.values()) final_sorted_results.sort(key=lambda a: a.upload_started) # sort by upload start time return final_sorted_results def csv_time_format(timestamp: Optional[float]) -> Optional[str]: if timestamp: return datetime.fromtimestamp(timestamp).isoformat() else: return None def write_results(results, file_handle): writer = csv.writer(file_handle) # Latency and rtt are in ms writer.writerow( ['id', 'http_start_time', 'http_complete_time', 'status_code', 'mqtt_arrive_time', 'latency', 'rtt'] ) for result in results: latency = (result.alert_arrived - result.upload_finished) * 1000 if result.alert_arrived else None rtt = (result.alert_arrived - result.upload_started) * 1000 if result.alert_arrived else None row = [ result.id, csv_time_format(result.upload_started), csv_time_format(result.upload_finished), result.upload_status_code, csv_time_format(result.alert_arrived), latency, rtt ] writer.writerow(row) def main(): help_epilog = textwrap.dedent('''\ CSV Columns meaning: id - Unique ID of a request (not sequential but at least unique) http_start_time - Timestamp of when the HTTP request is started http_complete_time - Timestamp of when the HTTP request is completed status_code - Status code of the HTTP request mqtt_arrive_time - Timestamp of when the corresponding MQTT message arrived latency - Latency of the system (http_complete_time - mqtt_arrive_time) in milliseconds rtt - Total round trip time (http_start_time - mqtt_arrive_time) in milliseconds ''') parser = argparse.ArgumentParser( description='Birbnetes Benchmarker 3', epilog=help_epilog, formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers (or threads)') parser.add_argument('--timeout', type=int, required=False, default=10, help='Maximum time for request sending (0 for unlimited)') parser.add_argument('--file', type=str, required=True, help='Name of the sound file to upload') parser.add_argument('--target-url', type=str, required=False, default="https://birb.k8s.kmlabz.com/sample", help='The target endpoint') parser.add_argument('--inflight-timeout', type=int, required=False, default=30, help='Number of seconds to wait for MQTT messages to arrive after uploading finished (not very precise) (0 for not waiting)') parser.add_argument('--output', type=str, required=True, help='name of the file to write results to (- for stdout)') parser.add_argument('--mqtt-username', type=str, required=False, default="birbnetes", help="Username for the MQTT server") parser.add_argument('--mqtt-port', type=int, required=False, default=1883, help="Port for the MQTT server") parser.add_argument('--mqtt-host', type=str, required=False, default="mqtt.kmlabz.com", help="Hostname for the MQTT server") parser.add_argument('--mqtt-password', type=str, required=False, default=None, help="Username for the MQTT server") args = parser.parse_args() print("Workers:", args.workers) print("Timeout:", args.timeout, "sec") alerts_arrived_queue = queue.Queue() print(f"Preparing MQTT (host: {args.mqtt_host}:{args.mqtt_port}, login: {bool(args.mqtt_password)})") subscription_waiter = threading.Event() # Preparing connection mqtt_client = paho.mqtt.client.Client(client_id="10") mqtt_client.on_connect = mqtt_on_connect mqtt_client.on_subscribe = mqtt_on_subscribe mqtt_client.user_data_set(subscription_waiter) if args.mqtt_password: mqtt_client.username_pw_set(args.mqtt_username, args.mqtt_password) # this is a blocking call, will wait until connecion is complete mqtt_client.connect(args.mqtt_host, args.mqtt_port, 60) # Start loop in background mqtt_client.loop_start() # Wait for subscription to be made subscription_waiter.wait() # Set final command reciever mqtt_client.user_data_set(alerts_arrived_queue) mqtt_client.on_message = mqtt_on_command print("MQTT Complete!") print("Running benchmark...") benchmark_results = run_benchmark(args.workers, args.timeout, args.file) print("Waiting for inflight MQTT messages to arrive...") # Wait for inflight messages for a little total_successful_uploads = len([req for req in benchmark_results if req.upload_status_code == 200]) all_arrived = False waiting_started = time.time() for _ in range(int(args.inflight_timeout * 1000)): if alerts_arrived_queue.qsize() >= total_successful_uploads: all_arrived = True break time.sleep(0.001) waited_for_inflight = time.time() - waiting_started print(f"Waited a total {waited_for_inflight} seconds") if not all_arrived: print("WARNING: Not all MQTT Messages arrived!") # Disconnectiong MQTT mqtt_client.disconnect() mqtt_client.loop_stop() # This stops further recieving MQTT messages # Aggregate results total_answered = 0 while True: try: returned_job = alerts_arrived_queue.get_nowait() except queue.Empty: break try: benchmark_results[returned_job.id].alert_arrived = returned_job.arrived except KeyError: print("Bruh moment: Alert arrived with an id that's not sent", returned_job.id) total_answered += 1 # print some mini statistics total_runtime = max(benchmark_results, key=lambda a: a.upload_finished).upload_finished - \ min(benchmark_results, key=lambda a: a.upload_started).upload_started print( f"{len(benchmark_results)} requests completed: {total_successful_uploads} successfully uploaded and {total_answered} answered" ) print(f"Test total runtime was {total_runtime} seconds") print("HTTP Request/sec:", len(benchmark_results) / total_runtime) if args.output == '-': print("Writing results to STDOUT") print("\n---------- CUT HERE ----------\n") write_results(benchmark_results, sys.stdout) else: print("Writing results to", args.output) with open(args.output, 'w') as f: write_results(benchmark_results, f) if __name__ == '__main__': main()