Added selectable strategy
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
Pünkösd Marcell 2020-10-23 00:09:07 +02:00
parent efd7639021
commit e6fcbc48a1

View File

@ -8,10 +8,11 @@ import textwrap
from datetime import datetime from datetime import datetime
from dataclasses import dataclass from dataclasses import dataclass
import argparse import argparse
from multiprocessing import Process, Queue
from queue import Empty
import paho.mqtt.client import paho.mqtt.client
from threading import Thread, current_thread, Timer, Event, Lock
import queue
import multiprocessing
import threading
@dataclass(frozen=False, init=False) @dataclass(frozen=False, init=False)
@ -32,14 +33,14 @@ class ReturnedRequest:
arrived: float arrived: float
class KebabRemover(Thread): class KebabRemover(threading.Thread):
def __init__(self, output_queue: Queue): def __init__(self, output_queue: queue.Queue):
super().__init__() super().__init__()
self.results = {} self.results = {}
self._output_queue = output_queue self._output_queue = output_queue
self._active = True self._active = True
self._lock = Lock() self._lock = threading.Lock()
def run(self): def run(self):
with self._lock: with self._lock:
@ -47,7 +48,7 @@ class KebabRemover(Thread):
try: try:
job = self._output_queue.get(timeout=2) job = self._output_queue.get(timeout=2)
self.results[job.id] = job self.results[job.id] = job
except Empty: except queue.Empty:
pass pass
def get_results(self) -> dict: def get_results(self) -> dict:
@ -63,14 +64,14 @@ def print_progress_meme(char: str):
sys.stdout.flush() sys.stdout.flush()
def sender_worker(input_queue: Queue, sent_queue: Queue, target_url: str, file_contents: bytes, request_timeout: float): def sender_worker(input_queue, sent_queue, target_url: str, file_contents: bytes, request_timeout: float):
content_len = len(file_contents) content_len = len(file_contents)
session = requests.Session() session = requests.Session()
try: try:
while True: while True:
try: try:
job = input_queue.get_nowait() job = input_queue.get_nowait()
except Empty: except queue.Empty:
return return
files = { files = {
@ -124,7 +125,7 @@ def mqtt_on_subscribe(client, userdata, mid, granted_qos):
userdata.set() userdata.set()
def infinite_job_generator(input_queue: Queue, target_ready_jobs: int): def infinite_job_generator(input_queue, target_ready_jobs: int):
i = 0 i = 0
target_ready_jobs_local = target_ready_jobs target_ready_jobs_local = target_ready_jobs
first = True first = True
@ -144,7 +145,7 @@ def infinite_job_generator(input_queue: Queue, target_ready_jobs: int):
input_queue.put(UploadRequest(i)) input_queue.put(UploadRequest(i))
time.sleep(0.02) time.sleep(0.02)
if not current_thread().active: if not threading.current_thread().active:
return return
@ -162,17 +163,30 @@ def run_benchmark(
timeout: float, timeout: float,
inflight_timeout: float, inflight_timeout: float,
request_timeout: float, request_timeout: float,
strategy: str,
mqtt_client: paho.mqtt.client.Client): mqtt_client: paho.mqtt.client.Client):
input_queue = Queue()
sent_queue = Queue() if strategy == 'thr':
returned_queue = Queue() input_queue = queue.Queue()
sent_queue = queue.Queue()
returned_queue = queue.Queue()
print("Using threading strategy")
elif strategy == 'proc':
input_queue = multiprocessing.Queue()
sent_queue = multiprocessing.Queue()
returned_queue = multiprocessing.Queue()
print("Using multiprocessing strategy")
else:
print("Wrong strategy")
return
sender_workers = [] sender_workers = []
input_generator_thread = None input_generator_thread = None
if num_requests == 0: if num_requests == 0:
print(f"Unlimited amount of requests will be generated...") print(f"Unlimited amount of requests will be generated...")
input_generator_thread = Thread(target=infinite_job_generator, args=(input_queue, num_workers * 100)) input_generator_thread = threading.Thread(target=infinite_job_generator, args=(input_queue, num_workers * 100))
input_generator_thread.active = True input_generator_thread.active = True
input_generator_thread.start() input_generator_thread.start()
else: else:
@ -181,7 +195,7 @@ def run_benchmark(
input_queue.put(UploadRequest(i + 1)) input_queue.put(UploadRequest(i + 1))
print("Waiting for MQTT subscription to complete") print("Waiting for MQTT subscription to complete")
subscribe_complete_event = Event() subscribe_complete_event = threading.Event()
mqtt_client.user_data_set(subscribe_complete_event) mqtt_client.user_data_set(subscribe_complete_event)
mqtt_client.loop_start() # Puts mqtt stuff to a background thread mqtt_client.loop_start() # Puts mqtt stuff to a background thread
@ -195,17 +209,24 @@ def run_benchmark(
finished_task_remover = KebabRemover(sent_queue) finished_task_remover = KebabRemover(sent_queue)
args = {'target': sender_worker,
'args': (
input_queue,
sent_queue,
target_url,
file_contents,
request_timeout
)}
for i in range(num_workers): for i in range(num_workers):
sender_workers.append( if strategy == 'thr':
Process(target=sender_worker, sender_workers.append(
args=( threading.Thread(**args)
input_queue, )
sent_queue, elif strategy == 'proc':
target_url, sender_workers.append(
file_contents, multiprocessing.Process(**args)
request_timeout )
))
)
print("Baszatás!") print("Baszatás!")
# Starting all workers # Starting all workers
@ -213,19 +234,11 @@ def run_benchmark(
for worker in sender_workers: for worker in sender_workers:
worker.start() worker.start()
worker_stopper_timer = None
if timeout != 0:
sender_worker_pids = [w.pid for w in sender_workers]
worker_stopper_timer = Timer(timeout, greacefully_stop_all_workers, args=(sender_worker_pids,))
worker_stopper_timer.start()
# Waiting for workers to finish # Waiting for workers to finish
try: try:
for worker in sender_workers: for worker in sender_workers:
worker.join() worker.join()
except KeyboardInterrupt: except KeyboardInterrupt:
if worker_stopper_timer:
worker_stopper_timer.cancel() # We don't want the auto stopper to fire again
# Azt jelenti: jovan bátyesz, megállítottam a küldést, most már csak a késő válaszokat várom # Azt jelenti: jovan bátyesz, megállítottam a küldést, most már csak a késő válaszokat várom
print_progress_meme('|') print_progress_meme('|')
# Interrupt forwarded for workers, so we just wait for them to quit # Interrupt forwarded for workers, so we just wait for them to quit
@ -263,7 +276,7 @@ def run_benchmark(
while True: while True:
try: try:
returned_job = returned_queue.get_nowait() returned_job = returned_queue.get_nowait()
except Empty: except queue.Empty:
break break
try: try:
@ -311,9 +324,14 @@ def main():
benchmark2.py --requests 100 --timeout 60 --file /home/testerboi/birbstuff/testdata/oIbBz.wav benchmark2.py --requests 100 --timeout 60 --file /home/testerboi/birbstuff/testdata/oIbBz.wav
''') ''')
parser = argparse.ArgumentParser(description='Birbnetes Benchmarker 2', epilog=help_epilog, parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter) description='Birbnetes Benchmarker 2',
parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers') epilog=help_epilog,
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('--strategy', type=str, required=False, default='thr', help='Forking strategy (thr or proc)')
parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers (or threads)')
parser.add_argument('--requests', type=int, required=True, help='Number of requests (0 for infinity)') parser.add_argument('--requests', type=int, required=True, help='Number of requests (0 for infinity)')
parser.add_argument('--timeout', type=int, required=False, default=0, parser.add_argument('--timeout', type=int, required=False, default=0,
help='Maximum time for request sending (0 for unlimited)') help='Maximum time for request sending (0 for unlimited)')
@ -347,7 +365,7 @@ def main():
client.connect(args.mqtt_host, args.mqtt_port, 60) # this is a blocking call, will wait until connecion is complete client.connect(args.mqtt_host, args.mqtt_port, 60) # this is a blocking call, will wait until connecion is complete
results = run_benchmark(args.requests, args.workers, args.file, args.target_url, args.timeout, results = run_benchmark(args.requests, args.workers, args.file, args.target_url, args.timeout,
args.inflight_timeout, args.request_timeout, client) args.inflight_timeout, args.request_timeout, args.strategy, client)
print("Eposzi kiértékelés idő...") print("Eposzi kiértékelés idő...")
print("id Upload Dur Return Dur Total Dur") print("id Upload Dur Return Dur Total Dur")