From 576befded639b956b26fcf58411e29506af10d7e Mon Sep 17 00:00:00 2001 From: marcsello Date: Wed, 21 Oct 2020 02:20:24 +0200 Subject: [PATCH] Timeout added --- benchmark2.py | 72 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 58 insertions(+), 14 deletions(-) diff --git a/benchmark2.py b/benchmark2.py index 5ea52c0..5bbdae1 100644 --- a/benchmark2.py +++ b/benchmark2.py @@ -1,14 +1,17 @@ +import os +import signal import time import sys import json import requests +import textwrap from datetime import datetime from dataclasses import dataclass import argparse from multiprocessing import Process, Queue from queue import Empty import paho.mqtt.client -from threading import Thread, current_thread +from threading import Thread, current_thread, Timer @dataclass(frozen=False, init=False) @@ -115,6 +118,12 @@ def infinite_job_generator(input_queue: Queue, target_ready_jobs: int): return +def greacefully_stop_all_workers(list_of_worker_pids: list): + for pid in list_of_worker_pids: + os.kill(pid, signal.SIGINT) + print_progress_meme('#') + + def run_benchmark( num_requests: int, num_workers: int, @@ -166,18 +175,27 @@ def run_benchmark( for worker in sender_workers: worker.start() + worker_stopper_timer = None + if timeout != 0: + sender_worker_pids = [w.pid for w in sender_workers] + worker_stopper_timer = Timer(timeout, greacefully_stop_all_workers, args=(sender_worker_pids,)) + worker_stopper_timer.start() + # Waiting for workers to finish try: for worker in sender_workers: worker.join() except KeyboardInterrupt: + if worker_stopper_timer: + worker_stopper_timer.cancel() # We don't want the auto stopper to fire again # Azt jelenti: jovan, megállítottam a küldést, most már csak az elveszett válaszokat várom print_progress_meme('|') # Interrupt forwarded for workers, so we just wait for them to quit for worker in sender_workers: - exitcode = worker.join(timeout=2) - if not exitcode: # = it did not exit - worker.terminate() # Not the nicest solution + if worker.is_alive(): + exitcode = worker.join(timeout=2) + if not exitcode: # = it did not exit + worker.terminate() # Not the nicest solution # Stop the input generator input_generator_thread.active = False @@ -207,8 +225,9 @@ def run_benchmark( mqtt_client.loop_stop() # This stops further recieving MQTT messages print() # Everything that might print from other threads are now stopped, so we put a newline - if returned_queue.qsize() < total_successful_uploads: - print("Waiting for inflight MQTT messages timed out!") + if int(inflight_timeout) != 0: # print only if there was a timeout + if returned_queue.qsize() < total_successful_uploads: + print("Waiting for inflight MQTT messages timed out!") # Fill out the missing fields in the reuslts total_answered = 0 @@ -226,27 +245,52 @@ def run_benchmark( total_answered += 1 # mini statistics - print(f"{len(results)} requests completed: {total_successful_uploads} successfully uploaded and {total_answered} answered") + print( + f"{len(results)} requests completed: {total_successful_uploads} successfully uploaded and {total_answered} answered") return results def main(): - parser = argparse.ArgumentParser(description='Birbnetes Benchmarker 2') + help_epilog = textwrap.dedent('''\ + Progress chars meaning: + HTTP related + . -> Upload succesfully completed (return code was 200) + ! -> Upload failed (return code was not 200) + ? -> Upload timed out (see --request-timeout) + + MQTT related + , -> Alert arrived (Trough MQTT) + + Process related + | -> User interrupted (CTRL+C pressed) + # -> Timeout expired (see --timeout) + + Examples: + One worker send exactly 100 requests (wait 10 secs for all alerts to arrive) + benchmark2.py --workers 1 --requests 100 --file /home/testerboi/birbstuff/testdata/oIbBz.wav --inflight-timeout 10 + + Ten worker run for exactly one minute (wait 10 secs for all alerts to arrive) + benchmark2.py --workers 10 --requests 0 --timeout 60 --file /home/testerboi/birbstuff/testdata/oIbBz.wav --inflight-timeout 10 + + Send either 100 requests or run for one minute (whichever comes first) + benchmark2.py --requests 100 --timeout 60 --file /home/testerboi/birbstuff/testdata/oIbBz.wav + ''') + + parser = argparse.ArgumentParser(description='Birbnetes Benchmarker 2', epilog=help_epilog, + formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers') - parser.add_argument('--requests', type=int, required=False, default=100, help='Number of requests (0 for infinity)') + parser.add_argument('--requests', type=int, required=True, help='Number of requests (0 for infinity)') parser.add_argument('--timeout', type=int, required=False, default=0, - help='Maximum time for request sending (0 for infinity)') + help='Maximum time for request sending (0 for unlimited)') parser.add_argument('--inflight-timeout', type=int, required=False, default=30, - help='Number of seconds to wait for MQTT messages to arrive after uploading finished (not very precise)') + help='Number of seconds to wait for MQTT messages to arrive after uploading finished (not very precise) (0 for not waiting)') parser.add_argument('--request-timeout', type=float, required=False, default=5, help='Timeout for HTTP requests') - parser.add_argument('--file', type=str, required=False, default="oIbBz.wav", - help='Name of the sound file to upload') - + parser.add_argument('--file', type=str, required=True, help='Name of the sound file to upload') parser.add_argument('--target-url', type=str, required=False, default="https://birb.k8s.kmlabz.com/sample", help='The target endpoint')