Implemented sending unlimited requests

This commit is contained in:
Pünkösd Marcell 2020-10-21 01:18:35 +02:00
parent c1f214c9d2
commit 7549401526

View File

@ -8,17 +8,19 @@ import argparse
from multiprocessing import Process, Queue from multiprocessing import Process, Queue
from queue import Empty from queue import Empty
import paho.mqtt.client import paho.mqtt.client
from threading import Thread, current_thread
@dataclass(frozen=False, init=False) @dataclass(frozen=False, init=False)
class UploadRequest: class UploadRequest:
id: int id: int
upload_started: float upload_started: float = None
upload_finished: float upload_finished: float = None
upload_success: bool = False upload_status_code: bool = False
alert_arrived: float = None
def __init__(self, id: int): def __init__(self, _id: int):
self.id = id self.id = _id
@dataclass(frozen=False, init=True) @dataclass(frozen=False, init=True)
@ -27,7 +29,12 @@ class ReturnedRequest:
arrived: float arrived: float
def sender_worker(input_queue, sent_queue, target_url, file_contents): def print_progress_meme(char: str):
print(char, end='')
sys.stdout.flush()
def sender_worker(input_queue: Queue, sent_queue: Queue, target_url: str, file_contents: bytes, request_timeout: float):
content_len = len(file_contents) content_len = len(file_contents)
session = requests.Session() session = requests.Session()
try: try:
@ -53,19 +60,20 @@ def sender_worker(input_queue, sent_queue, target_url, file_contents):
# SUPER HIGH PRECISION MEASUREMENT COMPARTMENT DEPARTMENT ADMIRAL GENERAL ALADEEN # SUPER HIGH PRECISION MEASUREMENT COMPARTMENT DEPARTMENT ADMIRAL GENERAL ALADEEN
job.upload_started = time.time() job.upload_started = time.time()
r = session.post(target_url, files=files) try:
job.upload_finished = time.time() r = session.post(target_url, files=files, timeout=request_timeout)
job.upload_finished = time.time()
if r.status_code == 200:
print_progress_meme('.')
else:
print_progress_meme('!')
job.upload_status_code = r.status_code
except requests.Timeout:
print_progress_meme('?')
# /'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\ # /'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\
if r.status_code == 200: sent_queue.put(job) # Will have none as status_code and upload_finished if the request timed out
print('.', end='')
job.upload_success = True
else:
print('!', end='')
job.upload_success = False
sys.stdout.flush()
sent_queue.put(job)
except KeyboardInterrupt: except KeyboardInterrupt:
return return
@ -76,20 +84,165 @@ def mqtt_on_connect(client, userdata, flags, rc):
def mqtt_on_command(client, userdata, message): def mqtt_on_command(client, userdata, message):
msg = json.loads(message.payload.decode()) # msg = json.loads(message.payload.decode())
if msg['command'] == 'doAlert': # if msg['command'] == 'doAlert':
_id = int(message.topic[len('command/'):]) _id = int(message.topic[8:]) # len('command/') = 8
userdata.put(ReturnedRequest(id=_id, arrived=time.time())) userdata.put(ReturnedRequest(id=_id, arrived=time.time()))
print(',', end='') print_progress_meme(',')
sys.stdout.flush()
def infinite_job_generator(input_queue: Queue, target_ready_jobs: int):
i = 0
target_ready_jobs_local = target_ready_jobs
first = True
while True:
jobs_ready = input_queue.qsize()
if jobs_ready < target_ready_jobs_local:
if not first: # In the first run there are exactly zero jobs ready, so
if jobs_ready < (target_ready_jobs_local / 2):
target_ready_jobs_local *= 2
else:
first = False
jobs_to_generate = target_ready_jobs_local - jobs_ready
for _ in range(jobs_to_generate):
i += 1
input_queue.put(UploadRequest(i))
time.sleep(0.02)
if not current_thread().active:
return
def run_benchmark(
num_requests: int,
num_workers: int,
sound_file_path: str,
target_url: str,
timeout: float,
inflight_timeout: float,
request_timeout: float,
mqtt_client: paho.mqtt.client.Client):
input_queue = Queue()
sent_queue = Queue()
returned_queue = Queue()
sender_workers = []
input_generator_thread = None
if num_requests == 0:
print(f"Unlimited amount of requests will be generated...")
input_generator_thread = Thread(target=infinite_job_generator, args=(input_queue, num_workers * 100))
input_generator_thread.active = True
input_generator_thread.start()
else:
print(f"Preparing {num_requests} requests...")
for i in range(num_requests):
input_queue.put(UploadRequest(i + 1))
mqtt_client.user_data_set(returned_queue)
mqtt_client.loop_start() # Puts mqtt stuff to a background thread
print(f"Preparing {num_workers} workers...")
with open(sound_file_path, "rb") as f:
file_contents = f.read()
for i in range(num_workers):
sender_workers.append(
Process(target=sender_worker,
args=(
input_queue,
sent_queue,
target_url,
file_contents,
request_timeout
))
)
print("Baszatás!")
# Starting all workers
for worker in sender_workers:
worker.start()
# Waiting for workers to finish
try:
for worker in sender_workers:
worker.join()
except KeyboardInterrupt:
# Azt jelenti: jovan, megállítottam a küldést, most már csak az elveszett válaszokat várom
print_progress_meme('|')
# Interrupt forwarded for workers, so we just wait for them to quit
for worker in sender_workers:
exitcode = worker.join(timeout=2)
if not exitcode: # = it did not exit
worker.terminate() # Not the nicest solution
# Stop the input generator
input_generator_thread.active = False
# read out all send requests to an array for handling later:
results = {}
total_successful_uploads = 0
while True:
try:
job = sent_queue.get_nowait()
except Empty:
break
if job.upload_status_code == 200:
total_successful_uploads += 1
results[job.id] = job
# total_successful_uploads = len([req for req in results.values() if req.upload_status_code == 200])
# Wait for inflight messages for a little
for _ in range(int(inflight_timeout)):
time.sleep(1)
if returned_queue.qsize() >= total_successful_uploads:
break
mqtt_client.loop_stop() # This stops further recieving MQTT messages
print() # Everything that might print from other threads are now stopped, so we put a newline
if returned_queue.qsize() < total_successful_uploads:
print("Waiting for inflight MQTT messages timed out!")
# Fill out the missing fields in the reuslts
total_answered = 0
while True:
try:
returned_job = returned_queue.get_nowait()
except Empty:
break
try:
results[returned_job.id].alert_arrived = returned_job.arrived
except KeyError:
print("Bruh moment: Alert arrived with an id that's not sent", returned_job.id)
total_answered += 1
# mini statistics
print(f"{len(results)} requests completed: {total_successful_uploads} successfully uploaded and {total_answered} answered")
return results
def main(): def main():
parser = argparse.ArgumentParser(description='Birbnetes Benchmarker 2') parser = argparse.ArgumentParser(description='Birbnetes Benchmarker 2')
parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers') parser.add_argument('--workers', type=int, required=False, default=1, help='Number of workers')
parser.add_argument('--requests', type=int, required=False, default=100, help='Number of requests') parser.add_argument('--requests', type=int, required=False, default=100, help='Number of requests (0 for infinity)')
parser.add_argument('--timeout', type=int, required=False, default=0,
help='Maximum time for request sending (0 for infinity)')
parser.add_argument('--inflight-timeout', type=int, required=False, default=30, parser.add_argument('--inflight-timeout', type=int, required=False, default=30,
help='Number of seconds to wait for MQTT messages to arrive after uploading finished') help='Number of seconds to wait for MQTT messages to arrive after uploading finished (not very precise)')
parser.add_argument('--request-timeout', type=float, required=False, default=5,
help='Timeout for HTTP requests')
parser.add_argument('--file', type=str, required=False, default="oIbBz.wav", parser.add_argument('--file', type=str, required=False, default="oIbBz.wav",
help='Name of the sound file to upload') help='Name of the sound file to upload')
@ -106,91 +259,30 @@ def main():
args = parser.parse_args() args = parser.parse_args()
input_queue = Queue()
sent_queue = Queue()
returned_queue = Queue()
sender_workers = []
print(f"Preparing {args.requests} requests...")
for i in range(args.requests):
input_queue.put(UploadRequest(i + 1))
print(f"Preparing MQTT (host: {args.mqtt_host}:{args.mqtt_port}, login: {bool(args.mqtt_password)})") print(f"Preparing MQTT (host: {args.mqtt_host}:{args.mqtt_port}, login: {bool(args.mqtt_password)})")
client = paho.mqtt.client.Client(client_id="10") client = paho.mqtt.client.Client(client_id="10")
client.on_connect = mqtt_on_connect client.on_connect = mqtt_on_connect
client.on_message = mqtt_on_command client.on_message = mqtt_on_command
client.user_data_set(returned_queue)
if args.mqtt_password: if args.mqtt_password:
client.username_pw_set(args.mqtt_username, args.mqtt_password) client.username_pw_set(args.mqtt_username, args.mqtt_password)
client.connect(args.mqtt_host, args.mqtt_port, 60) client.connect(args.mqtt_host, args.mqtt_port, 60)
client.loop_start() # Puts mqtt stuff to a background thread
print(f"Preparing {args.workers} workers...") results = run_benchmark(args.requests, args.workers, args.file, args.target_url, args.timeout,
args.inflight_timeout, args.request_timeout, client)
with open(args.file, "rb") as f:
file_contents = f.read()
for i in range(args.workers):
sender_workers.append(
Process(target=sender_worker, args=(input_queue, sent_queue, args.target_url, file_contents))
)
print("Baszatás!")
for worker in sender_workers:
worker.start()
try:
for worker in sender_workers:
worker.join()
except KeyboardInterrupt:
# Interrupt forwarded for workers, so we just wait for them to quit
for worker in sender_workers:
worker.join()
# read out all send requests to an array for easier handling:
sent_requests = {}
while True:
try:
job = sent_queue.get_nowait()
except Empty:
break
sent_requests[job.id] = job
total_successful_uploads = len([req for req in sent_requests.values() if req.upload_success])
# Wait for inflight messages for a little
for _ in range(args.inflight_timeout):
time.sleep(1)
if returned_queue.qsize() >= total_successful_uploads:
break
client.loop_stop()
# read out all recieved requests to an array for easier handling:
recieved_alerts = {}
while True:
try:
job = returned_queue.get_nowait()
except Empty:
break
recieved_alerts[job.id] = job
print()
print("Eposzi kiértékelés idő...") print("Eposzi kiértékelés idő...")
print("id Upload Dur Return Dur Total Dur") print("id Upload Dur Return Dur Total Dur")
for id_, req in sent_requests.items(): # read out all recieved requests to an array for easier handling:
alert = recieved_alerts.get(id_) # may return zero for id_, req in results.items():
if not alert: if not req.alert_arrived:
if req.upload_success: if req.upload_status_code == 200:
print(id_, req.upload_finished - req.upload_started, '???', '???') print(id_, req.upload_finished - req.upload_started, '???', '???')
else: else:
print(id_, req.upload_finished - req.upload_started, '!', '???', '???') print(id_, req.upload_finished - req.upload_started, '!', '???', '???')
else: else:
print(id_, req.upload_finished - req.upload_started, alert.arrived - req.upload_finished, print(id_, req.upload_finished - req.upload_started, req.alert_arrived - req.upload_finished,
alert.arrived - req.upload_started) req.alert_arrived - req.upload_started)
if __name__ == '__main__': if __name__ == '__main__':