diff --git a/benchmark2.py b/benchmark2.py index c13d177..861db2a 100644 --- a/benchmark2.py +++ b/benchmark2.py @@ -11,7 +11,7 @@ import argparse from multiprocessing import Process, Queue from queue import Empty import paho.mqtt.client -from threading import Thread, current_thread, Timer +from threading import Thread, current_thread, Timer, Event, Lock @dataclass(frozen=False, init=False) @@ -32,6 +32,32 @@ class ReturnedRequest: arrived: float +class KebabRemover(Thread): + + def __init__(self, output_queue: Queue): + super().__init__() + self.results = {} + self._output_queue = output_queue + self._active = True + self._lock = Lock() + + def run(self): + with self._lock: + while self._active: + try: + job = self._output_queue.get(timeout=2) + self.results[job.id] = job + except Empty: + pass + + def get_results(self) -> dict: + with self._lock: + return self.results + + def stop(self): + self._active = False + + def print_progress_meme(char: str): print(char, end='') sys.stdout.flush() @@ -94,6 +120,10 @@ def mqtt_on_command(client, userdata, message): print_progress_meme(',') +def mqtt_on_subscribe(client, userdata, mid, granted_qos): + userdata.set() + + def infinite_job_generator(input_queue: Queue, target_ready_jobs: int): i = 0 target_ready_jobs_local = target_ready_jobs @@ -150,14 +180,21 @@ def run_benchmark( for i in range(num_requests): input_queue.put(UploadRequest(i + 1)) - mqtt_client.user_data_set(returned_queue) + print("Waiting for MQTT subscription to complete") + subscribe_complete_event = Event() + mqtt_client.user_data_set(subscribe_complete_event) mqtt_client.loop_start() # Puts mqtt stuff to a background thread + subscribe_complete_event.wait() + mqtt_client.user_data_set(returned_queue) + print(f"Preparing {num_workers} workers...") with open(sound_file_path, "rb") as f: file_contents = f.read() + finished_task_remover = KebabRemover(sent_queue) + for i in range(num_workers): sender_workers.append( Process(target=sender_worker, @@ -172,6 +209,7 @@ def run_benchmark( print("Baszatás!") # Starting all workers + finished_task_remover.start() for worker in sender_workers: worker.start() @@ -188,7 +226,7 @@ def run_benchmark( except KeyboardInterrupt: if worker_stopper_timer: worker_stopper_timer.cancel() # We don't want the auto stopper to fire again - # Azt jelenti: jovan, megállítottam a küldést, most már csak az elveszett válaszokat várom + # Azt jelenti: jovan bátyesz, megállítottam a küldést, most már csak a késő válaszokat várom print_progress_meme('|') # Interrupt forwarded for workers, so we just wait for them to quit for worker in sender_workers: @@ -202,20 +240,9 @@ def run_benchmark( input_generator_thread.active = False # read out all send requests to an array for handling later: - results = {} - total_successful_uploads = 0 - while True: - try: - job = sent_queue.get_nowait() - except Empty: - break - - if job.upload_status_code == 200: - total_successful_uploads += 1 - - results[job.id] = job - - # total_successful_uploads = len([req for req in results.values() if req.upload_status_code == 200]) + finished_task_remover.stop() + results = finished_task_remover.get_results() + total_successful_uploads = len([req for req in results.values() if req.upload_status_code == 200]) # Wait for inflight messages for a little for _ in range(int(inflight_timeout)): @@ -223,6 +250,7 @@ def run_benchmark( if returned_queue.qsize() >= total_successful_uploads: break + mqtt_client.disconnect() mqtt_client.loop_stop() # This stops further recieving MQTT messages print() # Everything that might print from other threads are now stopped, so we put a newline @@ -308,9 +336,10 @@ def main(): client = paho.mqtt.client.Client(client_id="10") client.on_connect = mqtt_on_connect client.on_message = mqtt_on_command + client.on_subscribe = mqtt_on_subscribe if args.mqtt_password: client.username_pw_set(args.mqtt_username, args.mqtt_password) - client.connect(args.mqtt_host, args.mqtt_port, 60) + client.connect(args.mqtt_host, args.mqtt_port, 60) # this is a blocking call, will wait until connecion is complete results = run_benchmark(args.requests, args.workers, args.file, args.target_url, args.timeout, args.inflight_timeout, args.request_timeout, client) diff --git a/benchmark_simple.py b/benchmark_simple.py new file mode 100644 index 0000000..e69de29