Fixed hanging on lots of requests

This commit is contained in:
Pünkösd Marcell 2020-10-21 09:22:45 +02:00
parent 2c96c1f9ff
commit f00f87471d
2 changed files with 47 additions and 18 deletions

View File

@ -11,7 +11,7 @@ import argparse
from multiprocessing import Process, Queue from multiprocessing import Process, Queue
from queue import Empty from queue import Empty
import paho.mqtt.client import paho.mqtt.client
from threading import Thread, current_thread, Timer from threading import Thread, current_thread, Timer, Event, Lock
@dataclass(frozen=False, init=False) @dataclass(frozen=False, init=False)
@ -32,6 +32,32 @@ class ReturnedRequest:
arrived: float arrived: float
class KebabRemover(Thread):
def __init__(self, output_queue: Queue):
super().__init__()
self.results = {}
self._output_queue = output_queue
self._active = True
self._lock = Lock()
def run(self):
with self._lock:
while self._active:
try:
job = self._output_queue.get(timeout=2)
self.results[job.id] = job
except Empty:
pass
def get_results(self) -> dict:
with self._lock:
return self.results
def stop(self):
self._active = False
def print_progress_meme(char: str): def print_progress_meme(char: str):
print(char, end='') print(char, end='')
sys.stdout.flush() sys.stdout.flush()
@ -94,6 +120,10 @@ def mqtt_on_command(client, userdata, message):
print_progress_meme(',') print_progress_meme(',')
def mqtt_on_subscribe(client, userdata, mid, granted_qos):
userdata.set()
def infinite_job_generator(input_queue: Queue, target_ready_jobs: int): def infinite_job_generator(input_queue: Queue, target_ready_jobs: int):
i = 0 i = 0
target_ready_jobs_local = target_ready_jobs target_ready_jobs_local = target_ready_jobs
@ -150,14 +180,21 @@ def run_benchmark(
for i in range(num_requests): for i in range(num_requests):
input_queue.put(UploadRequest(i + 1)) input_queue.put(UploadRequest(i + 1))
mqtt_client.user_data_set(returned_queue) print("Waiting for MQTT subscription to complete")
subscribe_complete_event = Event()
mqtt_client.user_data_set(subscribe_complete_event)
mqtt_client.loop_start() # Puts mqtt stuff to a background thread mqtt_client.loop_start() # Puts mqtt stuff to a background thread
subscribe_complete_event.wait()
mqtt_client.user_data_set(returned_queue)
print(f"Preparing {num_workers} workers...") print(f"Preparing {num_workers} workers...")
with open(sound_file_path, "rb") as f: with open(sound_file_path, "rb") as f:
file_contents = f.read() file_contents = f.read()
finished_task_remover = KebabRemover(sent_queue)
for i in range(num_workers): for i in range(num_workers):
sender_workers.append( sender_workers.append(
Process(target=sender_worker, Process(target=sender_worker,
@ -172,6 +209,7 @@ def run_benchmark(
print("Baszatás!") print("Baszatás!")
# Starting all workers # Starting all workers
finished_task_remover.start()
for worker in sender_workers: for worker in sender_workers:
worker.start() worker.start()
@ -188,7 +226,7 @@ def run_benchmark(
except KeyboardInterrupt: except KeyboardInterrupt:
if worker_stopper_timer: if worker_stopper_timer:
worker_stopper_timer.cancel() # We don't want the auto stopper to fire again worker_stopper_timer.cancel() # We don't want the auto stopper to fire again
# Azt jelenti: jovan, megállítottam a küldést, most már csak az elveszett válaszokat várom # Azt jelenti: jovan bátyesz, megállítottam a küldést, most már csak a késő válaszokat várom
print_progress_meme('|') print_progress_meme('|')
# Interrupt forwarded for workers, so we just wait for them to quit # Interrupt forwarded for workers, so we just wait for them to quit
for worker in sender_workers: for worker in sender_workers:
@ -202,20 +240,9 @@ def run_benchmark(
input_generator_thread.active = False input_generator_thread.active = False
# read out all send requests to an array for handling later: # read out all send requests to an array for handling later:
results = {} finished_task_remover.stop()
total_successful_uploads = 0 results = finished_task_remover.get_results()
while True: total_successful_uploads = len([req for req in results.values() if req.upload_status_code == 200])
try:
job = sent_queue.get_nowait()
except Empty:
break
if job.upload_status_code == 200:
total_successful_uploads += 1
results[job.id] = job
# total_successful_uploads = len([req for req in results.values() if req.upload_status_code == 200])
# Wait for inflight messages for a little # Wait for inflight messages for a little
for _ in range(int(inflight_timeout)): for _ in range(int(inflight_timeout)):
@ -223,6 +250,7 @@ def run_benchmark(
if returned_queue.qsize() >= total_successful_uploads: if returned_queue.qsize() >= total_successful_uploads:
break break
mqtt_client.disconnect()
mqtt_client.loop_stop() # This stops further recieving MQTT messages mqtt_client.loop_stop() # This stops further recieving MQTT messages
print() # Everything that might print from other threads are now stopped, so we put a newline print() # Everything that might print from other threads are now stopped, so we put a newline
@ -308,9 +336,10 @@ def main():
client = paho.mqtt.client.Client(client_id="10") client = paho.mqtt.client.Client(client_id="10")
client.on_connect = mqtt_on_connect client.on_connect = mqtt_on_connect
client.on_message = mqtt_on_command client.on_message = mqtt_on_command
client.on_subscribe = mqtt_on_subscribe
if args.mqtt_password: if args.mqtt_password:
client.username_pw_set(args.mqtt_username, args.mqtt_password) client.username_pw_set(args.mqtt_username, args.mqtt_password)
client.connect(args.mqtt_host, args.mqtt_port, 60) client.connect(args.mqtt_host, args.mqtt_port, 60) # this is a blocking call, will wait until connecion is complete
results = run_benchmark(args.requests, args.workers, args.file, args.target_url, args.timeout, results = run_benchmark(args.requests, args.workers, args.file, args.target_url, args.timeout,
args.inflight_timeout, args.request_timeout, client) args.inflight_timeout, args.request_timeout, client)

0
benchmark_simple.py Normal file
View File