From e6fcbc48a1d5a2180e16d691bc5902e6f11e4ca4 Mon Sep 17 00:00:00 2001 From: marcsello Date: Fri, 23 Oct 2020 00:09:07 +0200 Subject: [PATCH] Added selectable strategy --- benchmark2.py | 96 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 57 insertions(+), 39 deletions(-) diff --git a/benchmark2.py b/benchmark2.py index 8a18a3c..b8254a9 100644 --- a/benchmark2.py +++ b/benchmark2.py @@ -8,10 +8,11 @@ import textwrap from datetime import datetime from dataclasses import dataclass import argparse -from multiprocessing import Process, Queue -from queue import Empty import paho.mqtt.client -from threading import Thread, current_thread, Timer, Event, Lock + +import queue +import multiprocessing +import threading @dataclass(frozen=False, init=False) @@ -32,14 +33,14 @@ class ReturnedRequest: arrived: float -class KebabRemover(Thread): +class KebabRemover(threading.Thread): - def __init__(self, output_queue: Queue): + def __init__(self, output_queue: queue.Queue): super().__init__() self.results = {} self._output_queue = output_queue self._active = True - self._lock = Lock() + self._lock = threading.Lock() def run(self): with self._lock: @@ -47,7 +48,7 @@ class KebabRemover(Thread): try: job = self._output_queue.get(timeout=2) self.results[job.id] = job - except Empty: + except queue.Empty: pass def get_results(self) -> dict: @@ -63,14 +64,14 @@ def print_progress_meme(char: str): sys.stdout.flush() -def sender_worker(input_queue: Queue, sent_queue: Queue, target_url: str, file_contents: bytes, request_timeout: float): +def sender_worker(input_queue, sent_queue, target_url: str, file_contents: bytes, request_timeout: float): content_len = len(file_contents) session = requests.Session() try: while True: try: job = input_queue.get_nowait() - except Empty: + except queue.Empty: return files = { @@ -124,7 +125,7 @@ def mqtt_on_subscribe(client, userdata, mid, granted_qos): userdata.set() -def infinite_job_generator(input_queue: Queue, target_ready_jobs: int): +def infinite_job_generator(input_queue, target_ready_jobs: int): i = 0 target_ready_jobs_local = target_ready_jobs first = True @@ -144,7 +145,7 @@ def infinite_job_generator(input_queue: Queue, target_ready_jobs: int): input_queue.put(UploadRequest(i)) time.sleep(0.02) - if not current_thread().active: + if not threading.current_thread().active: return @@ -162,17 +163,30 @@ def run_benchmark( timeout: float, inflight_timeout: float, request_timeout: float, + strategy: str, mqtt_client: paho.mqtt.client.Client): - input_queue = Queue() - sent_queue = Queue() - returned_queue = Queue() + + if strategy == 'thr': + input_queue = queue.Queue() + sent_queue = queue.Queue() + returned_queue = queue.Queue() + print("Using threading strategy") + elif strategy == 'proc': + input_queue = multiprocessing.Queue() + sent_queue = multiprocessing.Queue() + returned_queue = multiprocessing.Queue() + print("Using multiprocessing strategy") + else: + print("Wrong strategy") + return + sender_workers = [] input_generator_thread = None if num_requests == 0: print(f"Unlimited amount of requests will be generated...") - input_generator_thread = Thread(target=infinite_job_generator, args=(input_queue, num_workers * 100)) + input_generator_thread = threading.Thread(target=infinite_job_generator, args=(input_queue, num_workers * 100)) input_generator_thread.active = True input_generator_thread.start() else: @@ -181,7 +195,7 @@ def run_benchmark( input_queue.put(UploadRequest(i + 1)) print("Waiting for MQTT subscription to complete") - subscribe_complete_event = Event() + subscribe_complete_event = threading.Event() mqtt_client.user_data_set(subscribe_complete_event) mqtt_client.loop_start() # Puts mqtt stuff to a background thread @@ -195,17 +209,24 @@ def run_benchmark( finished_task_remover = KebabRemover(sent_queue) + args = {'target': sender_worker, + 'args': ( + input_queue, + sent_queue, + target_url, + file_contents, + request_timeout + )} + for i in range(num_workers): - sender_workers.append( - Process(target=sender_worker, - args=( - input_queue, - sent_queue, - target_url, - file_contents, - request_timeout - )) - ) + if strategy == 'thr': + sender_workers.append( + threading.Thread(**args) + ) + elif strategy == 'proc': + sender_workers.append( + multiprocessing.Process(**args) + ) print("Baszatás!") # Starting all workers @@ -213,19 +234,11 @@ def run_benchmark( for worker in sender_workers: worker.start() - worker_stopper_timer = None - if timeout != 0: - sender_worker_pids = [w.pid for w in sender_workers] - worker_stopper_timer = Timer(timeout, greacefully_stop_all_workers, args=(sender_worker_pids,)) - worker_stopper_timer.start() - # Waiting for workers to finish try: for worker in sender_workers: worker.join() except KeyboardInterrupt: - if worker_stopper_timer: - worker_stopper_timer.cancel() # We don't want the auto stopper to fire again # Azt jelenti: jovan bátyesz, megállítottam a küldést, most már csak a késő válaszokat várom print_progress_meme('|') # Interrupt forwarded for workers, so we just wait for them to quit @@ -263,7 +276,7 @@ def run_benchmark( while True: try: returned_job = returned_queue.get_nowait() - except Empty: + except queue.Empty: break try: @@ -311,9 +324,14 @@ def main(): benchmark2.py --requests 100 --timeout 60 --file /home/testerboi/birbstuff/testdata/oIbBz.wav ''') - parser = argparse.ArgumentParser(description='Birbnetes Benchmarker 2', epilog=help_epilog, - formatter_class=argparse.RawTextHelpFormatter) - parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers') + parser = argparse.ArgumentParser( + description='Birbnetes Benchmarker 2', + epilog=help_epilog, + formatter_class=argparse.RawTextHelpFormatter + ) + + parser.add_argument('--strategy', type=str, required=False, default='thr', help='Forking strategy (thr or proc)') + parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers (or threads)') parser.add_argument('--requests', type=int, required=True, help='Number of requests (0 for infinity)') parser.add_argument('--timeout', type=int, required=False, default=0, help='Maximum time for request sending (0 for unlimited)') @@ -347,7 +365,7 @@ def main(): client.connect(args.mqtt_host, args.mqtt_port, 60) # this is a blocking call, will wait until connecion is complete results = run_benchmark(args.requests, args.workers, args.file, args.target_url, args.timeout, - args.inflight_timeout, args.request_timeout, client) + args.inflight_timeout, args.request_timeout, args.strategy, client) print("Eposzi kiértékelés idő...") print("id Upload Dur Return Dur Total Dur")