From 32f7c29d39c0082f080625637ed7fdb2192273d5 Mon Sep 17 00:00:00 2001 From: marcsello Date: Fri, 23 Oct 2020 00:35:58 +0200 Subject: [PATCH] tried to move to pycurl --- benchmark2.py | 147 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 95 insertions(+), 52 deletions(-) diff --git a/benchmark2.py b/benchmark2.py index b8254a9..707dafe 100644 --- a/benchmark2.py +++ b/benchmark2.py @@ -4,6 +4,7 @@ import time import sys import json import requests +import pycurl import textwrap from datetime import datetime from dataclasses import dataclass @@ -64,51 +65,80 @@ def print_progress_meme(char: str): sys.stdout.flush() -def sender_worker(input_queue, sent_queue, target_url: str, file_contents: bytes, request_timeout: float): - content_len = len(file_contents) - session = requests.Session() +def allout_sender_worker(input_queue, sent_queue, target_url: str, file_contents: bytes, request_timeout: float): try: + prepared_curl = pycurl.Curl() + prepared_curl.setopt(pycurl.URL, target_url) + prepared_curl.setopt(pycurl.SSL_VERIFYPEER, 0) + prepared_curl.setopt(pycurl.SSL_VERIFYHOST, 0) + prepared_curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2_0) + prepared_curl.setopt(pycurl.WRITEFUNCTION, lambda data: None) + while True: try: job = input_queue.get_nowait() except queue.Empty: return - files = { - "file": ( - f"birbbenchmark2_testfile_{job.id}.wav", - file_contents, - 'audio/wave', - {'Content-length': content_len} - ), - "description": ( - None, - json.dumps({'date': datetime.now().isoformat(), 'device_id': str(job.id)}), - "application/json" - ) - } - - # SUPER HIGH PRECISION MEASUREMENT COMPARTMENT DEPARTMENT ADMIRAL GENERAL ALADEEN job.upload_started = time.time() - try: - r = session.post(target_url, files=files, timeout=request_timeout) - job.upload_finished = time.time() - if r.status_code == 200: - print_progress_meme('.') - else: - print_progress_meme('!') + prepared_curl.perform() + job.upload_finished = time.time() - job.upload_status_code = r.status_code - except requests.Timeout: - print_progress_meme('?') - # /'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\ + status_code = prepared_curl.getinfo(pycurl.HTTP_CODE) + if status_code == 200: + print_progress_meme('.') + else: + print_progress_meme('!') - sent_queue.put(job) # Will have none as status_code and upload_finished if the request timed out + job.upload_status_code = status_code + + sent_queue.put(job) except KeyboardInterrupt: return +def timeouting_sender_worker(input_queue, sent_queue, target_url: str, file_contents: bytes, request_timeout: float, + timeout: float): + try: + prepared_curl = pycurl.Curl() + prepared_curl.setopt(pycurl.URL, target_url) + prepared_curl.setopt(pycurl.SSL_VERIFYPEER, 0) + prepared_curl.setopt(pycurl.SSL_VERIFYHOST, 0) + prepared_curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2_0) + prepared_curl.setopt(pycurl.WRITEFUNCTION, lambda data: None) + + start_time = time.time() + count = 0 + while True: + try: + job = input_queue.get_nowait() + except queue.Empty: + return + + job.upload_started = time.time() + prepared_curl.perform() + job.upload_finished = time.time() + + status_code = prepared_curl.getinfo(pycurl.HTTP_CODE) + if status_code == 200: + print_progress_meme('.') + else: + print_progress_meme('!') + + job.upload_status_code = status_code + + sent_queue.put(job) + + # check for timeout + count += 1 + if count % 50 == 0: + if (time.time() - start_time) >= timeout: + return + + except KeyboardInterrupt: + return + def mqtt_on_connect(client, userdata, flags, rc): client.subscribe(f"command/#") @@ -148,13 +178,6 @@ def infinite_job_generator(input_queue, target_ready_jobs: int): if not threading.current_thread().active: return - -def greacefully_stop_all_workers(list_of_worker_pids: list): - for pid in list_of_worker_pids: - os.kill(pid, signal.SIGINT) - print_progress_meme('#') - - def run_benchmark( num_requests: int, num_workers: int, @@ -165,7 +188,6 @@ def run_benchmark( request_timeout: float, strategy: str, mqtt_client: paho.mqtt.client.Client): - if strategy == 'thr': input_queue = queue.Queue() sent_queue = queue.Queue() @@ -209,14 +231,25 @@ def run_benchmark( finished_task_remover = KebabRemover(sent_queue) - args = {'target': sender_worker, - 'args': ( - input_queue, - sent_queue, - target_url, - file_contents, - request_timeout - )} + if timeout == 0: + args = {'target': allout_sender_worker, + 'args': ( + input_queue, + sent_queue, + target_url, + file_contents, + request_timeout + )} + else: + args = {'target': timeouting_sender_worker, + 'args': ( + input_queue, + sent_queue, + target_url, + file_contents, + request_timeout, + timeout + )} for i in range(num_workers): if strategy == 'thr': @@ -241,12 +274,19 @@ def run_benchmark( except KeyboardInterrupt: # Azt jelenti: jovan bátyesz, megállítottam a küldést, most már csak a késő válaszokat várom print_progress_meme('|') - # Interrupt forwarded for workers, so we just wait for them to quit - for worker in sender_workers: - if worker.is_alive(): - exitcode = worker.join(timeout=2) - if not exitcode: # = it did not exit - worker.terminate() # Not the nicest solution + + if strategy == 'thr': + # You should not press ctrl-c in thr + for worker in sender_workers: + worker.join() + + elif strategy == 'proc': + # Interrupt forwarded for workers in mul mode, so we just wait for them to quit + for worker in sender_workers: + if worker.is_alive(): + exitcode = worker.join(timeout=2) + if not exitcode: # = it did not exit + worker.terminate() # Not the nicest solution # Stop the input generator if input_generator_thread: @@ -322,6 +362,9 @@ def main(): Send either 100 requests or run for one minute (whichever comes first) benchmark2.py --requests 100 --timeout 60 --file /home/testerboi/birbstuff/testdata/oIbBz.wav + + Note: + Threading strategy does not support Ctrl+C ''') parser = argparse.ArgumentParser(