From 7549401526bd0af18a694dec22f012ee19e6d097 Mon Sep 17 00:00:00 2001 From: marcsello Date: Wed, 21 Oct 2020 01:18:35 +0200 Subject: [PATCH] Implemented sending unlimited requests --- benchmark2.py | 280 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 186 insertions(+), 94 deletions(-) diff --git a/benchmark2.py b/benchmark2.py index 2ecfecf..5ea52c0 100644 --- a/benchmark2.py +++ b/benchmark2.py @@ -8,17 +8,19 @@ import argparse from multiprocessing import Process, Queue from queue import Empty import paho.mqtt.client +from threading import Thread, current_thread @dataclass(frozen=False, init=False) class UploadRequest: id: int - upload_started: float - upload_finished: float - upload_success: bool = False + upload_started: float = None + upload_finished: float = None + upload_status_code: bool = False + alert_arrived: float = None - def __init__(self, id: int): - self.id = id + def __init__(self, _id: int): + self.id = _id @dataclass(frozen=False, init=True) @@ -27,7 +29,12 @@ class ReturnedRequest: arrived: float -def sender_worker(input_queue, sent_queue, target_url, file_contents): +def print_progress_meme(char: str): + print(char, end='') + sys.stdout.flush() + + +def sender_worker(input_queue: Queue, sent_queue: Queue, target_url: str, file_contents: bytes, request_timeout: float): content_len = len(file_contents) session = requests.Session() try: @@ -53,19 +60,20 @@ def sender_worker(input_queue, sent_queue, target_url, file_contents): # SUPER HIGH PRECISION MEASUREMENT COMPARTMENT DEPARTMENT ADMIRAL GENERAL ALADEEN job.upload_started = time.time() - r = session.post(target_url, files=files) - job.upload_finished = time.time() + try: + r = session.post(target_url, files=files, timeout=request_timeout) + job.upload_finished = time.time() + if r.status_code == 200: + print_progress_meme('.') + else: + print_progress_meme('!') + + job.upload_status_code = r.status_code + except requests.Timeout: + print_progress_meme('?') # /'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\ - if r.status_code == 200: - print('.', end='') - job.upload_success = True - else: - print('!', end='') - job.upload_success = False - - sys.stdout.flush() - sent_queue.put(job) + sent_queue.put(job) # Will have none as status_code and upload_finished if the request timed out except KeyboardInterrupt: return @@ -76,20 +84,165 @@ def mqtt_on_connect(client, userdata, flags, rc): def mqtt_on_command(client, userdata, message): - msg = json.loads(message.payload.decode()) - if msg['command'] == 'doAlert': - _id = int(message.topic[len('command/'):]) - userdata.put(ReturnedRequest(id=_id, arrived=time.time())) - print(',', end='') - sys.stdout.flush() + # msg = json.loads(message.payload.decode()) + # if msg['command'] == 'doAlert': + _id = int(message.topic[8:]) # len('command/') = 8 + userdata.put(ReturnedRequest(id=_id, arrived=time.time())) + print_progress_meme(',') + + +def infinite_job_generator(input_queue: Queue, target_ready_jobs: int): + i = 0 + target_ready_jobs_local = target_ready_jobs + first = True + while True: + jobs_ready = input_queue.qsize() + if jobs_ready < target_ready_jobs_local: + + if not first: # In the first run there are exactly zero jobs ready, so + if jobs_ready < (target_ready_jobs_local / 2): + target_ready_jobs_local *= 2 + else: + first = False + + jobs_to_generate = target_ready_jobs_local - jobs_ready + for _ in range(jobs_to_generate): + i += 1 + input_queue.put(UploadRequest(i)) + time.sleep(0.02) + + if not current_thread().active: + return + + +def run_benchmark( + num_requests: int, + num_workers: int, + sound_file_path: str, + target_url: str, + timeout: float, + inflight_timeout: float, + request_timeout: float, + mqtt_client: paho.mqtt.client.Client): + input_queue = Queue() + sent_queue = Queue() + returned_queue = Queue() + sender_workers = [] + + input_generator_thread = None + + if num_requests == 0: + print(f"Unlimited amount of requests will be generated...") + input_generator_thread = Thread(target=infinite_job_generator, args=(input_queue, num_workers * 100)) + input_generator_thread.active = True + input_generator_thread.start() + else: + print(f"Preparing {num_requests} requests...") + for i in range(num_requests): + input_queue.put(UploadRequest(i + 1)) + + mqtt_client.user_data_set(returned_queue) + mqtt_client.loop_start() # Puts mqtt stuff to a background thread + + print(f"Preparing {num_workers} workers...") + + with open(sound_file_path, "rb") as f: + file_contents = f.read() + + for i in range(num_workers): + sender_workers.append( + Process(target=sender_worker, + args=( + input_queue, + sent_queue, + target_url, + file_contents, + request_timeout + )) + ) + + print("Baszatás!") + # Starting all workers + for worker in sender_workers: + worker.start() + + # Waiting for workers to finish + try: + for worker in sender_workers: + worker.join() + except KeyboardInterrupt: + # Azt jelenti: jovan, megállítottam a küldést, most már csak az elveszett válaszokat várom + print_progress_meme('|') + # Interrupt forwarded for workers, so we just wait for them to quit + for worker in sender_workers: + exitcode = worker.join(timeout=2) + if not exitcode: # = it did not exit + worker.terminate() # Not the nicest solution + + # Stop the input generator + input_generator_thread.active = False + + # read out all send requests to an array for handling later: + results = {} + total_successful_uploads = 0 + while True: + try: + job = sent_queue.get_nowait() + except Empty: + break + + if job.upload_status_code == 200: + total_successful_uploads += 1 + + results[job.id] = job + + # total_successful_uploads = len([req for req in results.values() if req.upload_status_code == 200]) + + # Wait for inflight messages for a little + for _ in range(int(inflight_timeout)): + time.sleep(1) + if returned_queue.qsize() >= total_successful_uploads: + break + + mqtt_client.loop_stop() # This stops further recieving MQTT messages + print() # Everything that might print from other threads are now stopped, so we put a newline + + if returned_queue.qsize() < total_successful_uploads: + print("Waiting for inflight MQTT messages timed out!") + + # Fill out the missing fields in the reuslts + total_answered = 0 + while True: + try: + returned_job = returned_queue.get_nowait() + except Empty: + break + + try: + results[returned_job.id].alert_arrived = returned_job.arrived + except KeyError: + print("Bruh moment: Alert arrived with an id that's not sent", returned_job.id) + + total_answered += 1 + + # mini statistics + print(f"{len(results)} requests completed: {total_successful_uploads} successfully uploaded and {total_answered} answered") + + return results def main(): parser = argparse.ArgumentParser(description='Birbnetes Benchmarker 2') parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers') - parser.add_argument('--requests', type=int, required=False, default=100, help='Number of requests') + parser.add_argument('--requests', type=int, required=False, default=100, help='Number of requests (0 for infinity)') + parser.add_argument('--timeout', type=int, required=False, default=0, + help='Maximum time for request sending (0 for infinity)') + parser.add_argument('--inflight-timeout', type=int, required=False, default=30, - help='Number of seconds to wait for MQTT messages to arrive after uploading finished') + help='Number of seconds to wait for MQTT messages to arrive after uploading finished (not very precise)') + + parser.add_argument('--request-timeout', type=float, required=False, default=5, + help='Timeout for HTTP requests') parser.add_argument('--file', type=str, required=False, default="oIbBz.wav", help='Name of the sound file to upload') @@ -106,91 +259,30 @@ def main(): args = parser.parse_args() - input_queue = Queue() - sent_queue = Queue() - returned_queue = Queue() - sender_workers = [] - - print(f"Preparing {args.requests} requests...") - for i in range(args.requests): - input_queue.put(UploadRequest(i + 1)) - print(f"Preparing MQTT (host: {args.mqtt_host}:{args.mqtt_port}, login: {bool(args.mqtt_password)})") client = paho.mqtt.client.Client(client_id="10") client.on_connect = mqtt_on_connect client.on_message = mqtt_on_command - client.user_data_set(returned_queue) - if args.mqtt_password: client.username_pw_set(args.mqtt_username, args.mqtt_password) - client.connect(args.mqtt_host, args.mqtt_port, 60) - client.loop_start() # Puts mqtt stuff to a background thread - print(f"Preparing {args.workers} workers...") + results = run_benchmark(args.requests, args.workers, args.file, args.target_url, args.timeout, + args.inflight_timeout, args.request_timeout, client) - with open(args.file, "rb") as f: - file_contents = f.read() - - for i in range(args.workers): - sender_workers.append( - Process(target=sender_worker, args=(input_queue, sent_queue, args.target_url, file_contents)) - ) - - print("Baszatás!") - for worker in sender_workers: - worker.start() - - try: - for worker in sender_workers: - worker.join() - except KeyboardInterrupt: - # Interrupt forwarded for workers, so we just wait for them to quit - for worker in sender_workers: - worker.join() - - # read out all send requests to an array for easier handling: - sent_requests = {} - while True: - try: - job = sent_queue.get_nowait() - except Empty: - break - sent_requests[job.id] = job - - total_successful_uploads = len([req for req in sent_requests.values() if req.upload_success]) - - # Wait for inflight messages for a little - for _ in range(args.inflight_timeout): - time.sleep(1) - if returned_queue.qsize() >= total_successful_uploads: - break - - client.loop_stop() - - # read out all recieved requests to an array for easier handling: - recieved_alerts = {} - while True: - try: - job = returned_queue.get_nowait() - except Empty: - break - recieved_alerts[job.id] = job - - print() print("Eposzi kiértékelés idő...") print("id Upload Dur Return Dur Total Dur") - for id_, req in sent_requests.items(): - alert = recieved_alerts.get(id_) # may return zero + # read out all recieved requests to an array for easier handling: + for id_, req in results.items(): - if not alert: - if req.upload_success: + if not req.alert_arrived: + if req.upload_status_code == 200: print(id_, req.upload_finished - req.upload_started, '???', '???') else: print(id_, req.upload_finished - req.upload_started, '!', '???', '???') else: - print(id_, req.upload_finished - req.upload_started, alert.arrived - req.upload_finished, - alert.arrived - req.upload_started) + print(id_, req.upload_finished - req.upload_started, req.alert_arrived - req.upload_finished, + req.alert_arrived - req.upload_started) if __name__ == '__main__':