tried to move to pycurl

This commit is contained in:
Pünkösd Marcell 2020-10-23 00:35:58 +02:00
parent e6fcbc48a1
commit 32f7c29d39

View File

@ -4,6 +4,7 @@ import time
import sys
import json
import requests
import pycurl
import textwrap
from datetime import datetime
from dataclasses import dataclass
@ -64,51 +65,80 @@ def print_progress_meme(char: str):
sys.stdout.flush()
def sender_worker(input_queue, sent_queue, target_url: str, file_contents: bytes, request_timeout: float):
content_len = len(file_contents)
session = requests.Session()
def allout_sender_worker(input_queue, sent_queue, target_url: str, file_contents: bytes, request_timeout: float):
try:
prepared_curl = pycurl.Curl()
prepared_curl.setopt(pycurl.URL, target_url)
prepared_curl.setopt(pycurl.SSL_VERIFYPEER, 0)
prepared_curl.setopt(pycurl.SSL_VERIFYHOST, 0)
prepared_curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2_0)
prepared_curl.setopt(pycurl.WRITEFUNCTION, lambda data: None)
while True:
try:
job = input_queue.get_nowait()
except queue.Empty:
return
files = {
"file": (
f"birbbenchmark2_testfile_{job.id}.wav",
file_contents,
'audio/wave',
{'Content-length': content_len}
),
"description": (
None,
json.dumps({'date': datetime.now().isoformat(), 'device_id': str(job.id)}),
"application/json"
)
}
# SUPER HIGH PRECISION MEASUREMENT COMPARTMENT DEPARTMENT ADMIRAL GENERAL ALADEEN
job.upload_started = time.time()
try:
r = session.post(target_url, files=files, timeout=request_timeout)
job.upload_finished = time.time()
if r.status_code == 200:
print_progress_meme('.')
else:
print_progress_meme('!')
prepared_curl.perform()
job.upload_finished = time.time()
job.upload_status_code = r.status_code
except requests.Timeout:
print_progress_meme('?')
# /'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\./'\
status_code = prepared_curl.getinfo(pycurl.HTTP_CODE)
if status_code == 200:
print_progress_meme('.')
else:
print_progress_meme('!')
sent_queue.put(job) # Will have none as status_code and upload_finished if the request timed out
job.upload_status_code = status_code
sent_queue.put(job)
except KeyboardInterrupt:
return
def timeouting_sender_worker(input_queue, sent_queue, target_url: str, file_contents: bytes, request_timeout: float,
timeout: float):
try:
prepared_curl = pycurl.Curl()
prepared_curl.setopt(pycurl.URL, target_url)
prepared_curl.setopt(pycurl.SSL_VERIFYPEER, 0)
prepared_curl.setopt(pycurl.SSL_VERIFYHOST, 0)
prepared_curl.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2_0)
prepared_curl.setopt(pycurl.WRITEFUNCTION, lambda data: None)
start_time = time.time()
count = 0
while True:
try:
job = input_queue.get_nowait()
except queue.Empty:
return
job.upload_started = time.time()
prepared_curl.perform()
job.upload_finished = time.time()
status_code = prepared_curl.getinfo(pycurl.HTTP_CODE)
if status_code == 200:
print_progress_meme('.')
else:
print_progress_meme('!')
job.upload_status_code = status_code
sent_queue.put(job)
# check for timeout
count += 1
if count % 50 == 0:
if (time.time() - start_time) >= timeout:
return
except KeyboardInterrupt:
return
def mqtt_on_connect(client, userdata, flags, rc):
client.subscribe(f"command/#")
@ -148,13 +178,6 @@ def infinite_job_generator(input_queue, target_ready_jobs: int):
if not threading.current_thread().active:
return
def greacefully_stop_all_workers(list_of_worker_pids: list):
for pid in list_of_worker_pids:
os.kill(pid, signal.SIGINT)
print_progress_meme('#')
def run_benchmark(
num_requests: int,
num_workers: int,
@ -165,7 +188,6 @@ def run_benchmark(
request_timeout: float,
strategy: str,
mqtt_client: paho.mqtt.client.Client):
if strategy == 'thr':
input_queue = queue.Queue()
sent_queue = queue.Queue()
@ -209,14 +231,25 @@ def run_benchmark(
finished_task_remover = KebabRemover(sent_queue)
args = {'target': sender_worker,
'args': (
input_queue,
sent_queue,
target_url,
file_contents,
request_timeout
)}
if timeout == 0:
args = {'target': allout_sender_worker,
'args': (
input_queue,
sent_queue,
target_url,
file_contents,
request_timeout
)}
else:
args = {'target': timeouting_sender_worker,
'args': (
input_queue,
sent_queue,
target_url,
file_contents,
request_timeout,
timeout
)}
for i in range(num_workers):
if strategy == 'thr':
@ -241,12 +274,19 @@ def run_benchmark(
except KeyboardInterrupt:
# Azt jelenti: jovan bátyesz, megállítottam a küldést, most már csak a késő válaszokat várom
print_progress_meme('|')
# Interrupt forwarded for workers, so we just wait for them to quit
for worker in sender_workers:
if worker.is_alive():
exitcode = worker.join(timeout=2)
if not exitcode: # = it did not exit
worker.terminate() # Not the nicest solution
if strategy == 'thr':
# You should not press ctrl-c in thr
for worker in sender_workers:
worker.join()
elif strategy == 'proc':
# Interrupt forwarded for workers in mul mode, so we just wait for them to quit
for worker in sender_workers:
if worker.is_alive():
exitcode = worker.join(timeout=2)
if not exitcode: # = it did not exit
worker.terminate() # Not the nicest solution
# Stop the input generator
if input_generator_thread:
@ -322,6 +362,9 @@ def main():
Send either 100 requests or run for one minute (whichever comes first)
benchmark2.py --requests 100 --timeout 60 --file /home/testerboi/birbstuff/testdata/oIbBz.wav
Note:
Threading strategy does not support Ctrl+C
''')
parser = argparse.ArgumentParser(