#!/usr/bin/env python3 """ Python module to automatically analyze benchmark results """ import csv import os import abc import datetime import multiprocessing import matplotlib.pyplot as pplot import sentry_sdk sentry_sdk.init("https://c1a84042595b448b94681750523dcc34@sentry.kmlabz.com/20") def average(lst: list) -> float: """ Agerage of list :param lst: :return: """ return sum(lst) / len(lst) def keychars(x): """ Return hey partnumber :param x: :return: """ return int(x.split('.')[1]) class Analyzer(abc.ABC): """ Very abstract analyzer """ def __init__(self, typeof='.csv'): self.type = typeof def getfiles(self, directory: str = '.') -> list: """ Returns a list of csv files in directory :param directory: :return: """ return [(directory + '/' + filename) for filename in os.listdir(directory) if filename.endswith(self.type)] def processfile( self, fname: str, shouldprint: bool = False) -> None: pass def processallfiles(self, directory: str = '.') -> None: """ Process all files in a directory :param directory: :return: """ files = sorted(self.getfiles(directory), key=keychars) for f in files: self.processfile(f, False) class CsvAnalyzer(Analyzer): """ Abstract CSV analyzer """ def __init__(self): """ Init object """ super().__init__() self.responsespersec = [] self.latencypersec = [] def walkresponsepersec( self, responsepersec: dict, shouldprint: bool) -> None: """ Walks through reponsepersec dict :param responsepersec: :param shouldprint: :return: """ for sec in responsepersec: if len(responsepersec[sec]) != 0: self.responsespersec.append(len(responsepersec[sec])) self.latencypersec.append(average(responsepersec[sec])) if shouldprint: print(len(responsepersec[sec])) print(average(responsepersec[sec])) class HeyAnalyzer(CsvAnalyzer): """ Analyze hey benchmark output. """ def __init__(self): """ Init object """ super().__init__() def processfile( self, fname, shouldprint: bool = False): """ Process a single file. :param fname: :param shouldprint: :return: """ with open(fname, 'r') as f: data = csv.reader(f) fields = next(data) responsepersec = {} for row in data: items = zip(fields, row) item = {} for (name, value) in items: item[name] = value.strip() sec = int(item['offset'].split('.')[0]) if sec not in responsepersec: responsepersec[sec] = [] else: responsepersec[sec].append(float(item['response-time'])) self.walkresponsepersec(responsepersec, shouldprint) class BirbAnalyzer(CsvAnalyzer): """ Analyze Birbnetes benchmark output. """ def __init__(self): """ Init object """ super().__init__() def processfile( self, fname, shouldprint: bool = False): """ Process a single file. :param fname: :param shouldprint: :return: """ with open(fname, 'r') as f: data = csv.reader(f) fields = next(data) responsepersec = {} for row in data: items = zip(fields, row) item = {} for (name, value) in items: item[name] = value.strip() sec = datetime.datetime.strptime(item['http_start_time'], '%Y-%m-%dT%H:%M:%S.%f').strftime("%H:%M:%S") if sec not in responsepersec: responsepersec[sec] = [] else: responsepersec[sec].append(float(item['rtt'])) self.walkresponsepersec(responsepersec, shouldprint) class InputAnalyzer(CsvAnalyzer): """ Analyze InputService benchmark output. """ def __init__(self): """ Init object """ super().__init__() def processfile( self, fname, shouldprint: bool = False): """ Process a single file. :param fname: :param shouldprint: :return: """ with open(fname, 'r') as f: data = csv.reader(f) fields = next(data) responsepersec = {} for row in data: items = zip(fields, row) item = {} for (name, value) in items: item[name] = value.strip() sec = datetime.datetime.strptime(item['http_start_time'], '%Y-%m-%dT%H:%M:%S.%f') if sec not in responsepersec: responsepersec[sec] = [] else: firetime = datetime.datetime.strptime(item['http_start_time'], '%Y-%m-%dT%H:%M:%S.%f') responsearrivetime = datetime.datetime.strptime(item['http_complete_time'], "%Y-%m-%dT%H:%M:%S.%f") latency = responsearrivetime - firetime responsepersec[sec].append(int(latency.total_seconds() * 1000)) self.walkresponsepersec(responsepersec, shouldprint) class ChartCreator: """ Create charts automagically """ @staticmethod def savetxtplot(csvfile: CsvAnalyzer, directory) -> None: """ Save raw data to txt :param directory: :param csvfile: :return: """ with open(os.getenv('TEXTDIR', default='.') + '/' + directory + "-rps.txt", 'w') as f: for item in csvfile.responsespersec: f.write("%s\n" % item) with open(os.getenv('TEXTDIR', default='.') + '/' + directory + "-latency.txt", 'w') as f: for item in csvfile.latencypersec: f.write("%s\n" % item) @staticmethod def savecsvplot(csvfile: CsvAnalyzer, directory) -> None: """ Save plot of csv file :param csvfile: :param directory: :return: """ pplot.plot(csvfile.responsespersec) pplot.title(directory) pplot.xlabel("Time (seconds)") pplot.ylabel("Response/sec") pplot.savefig( os.getenv( 'CHARTDIR', default='.') + '/' + directory + "-rps.png") pplot.clf() pplot.plot(csvfile.latencypersec) pplot.title(directory) pplot.xlabel("Time (seconds)") pplot.ylabel("Response time (milliseconds)") pplot.savefig( os.getenv( 'CHARTDIR', default='.') + '/' + directory + "-latency.png") pplot.clf() print("latency min, max, avg") print(min(csvfile.latencypersec)) print(max(csvfile.latencypersec)) print(average(csvfile.latencypersec)) print("Charted " + directory) @staticmethod def analyze_hey(abs_directory, directory): """ Analyze hey output :param abs_directory: :param directory: :return: """ hey = HeyAnalyzer() hey.processallfiles(abs_directory) ChartCreator.savecsvplot(hey, directory) ChartCreator.savetxtplot(hey, directory) @staticmethod def analyze_birb(abs_directory, directory): """ Analyze BirbBench output :param abs_directory: :param directory: :return: """ birb = BirbAnalyzer() birb.processallfiles(abs_directory) ChartCreator.savecsvplot(birb, directory) ChartCreator.savetxtplot(birb, directory) @staticmethod def analyze_input(abs_directory, directory): """ Analyze InputSvc output :param abs_directory: :param directory: :return: """ inputsvc = InputAnalyzer() inputsvc.processallfiles(abs_directory) ChartCreator.savecsvplot(inputsvc, directory + "input") ChartCreator.savetxtplot(inputsvc, directory + "input") def doallruns(self): """ Process all directories in repo :return: """ dirs = next(os.walk(os.getenv('SEARCHDIR', default='.')))[1] jobs = [] for directory in dirs: abs_directory = os.getenv( 'SEARCHDIR', default='.') + '/' + directory print(abs_directory) if 'HEY' not in abs_directory.upper(): process = multiprocessing.Process( target=ChartCreator.analyze_hey, args=( abs_directory, directory,)) jobs.append(process) else: process_birb = multiprocessing.Process( target=ChartCreator.analyze_birb, args=( abs_directory, directory,)) process_input = multiprocessing.Process( target=ChartCreator.analyze_input, args=( abs_directory, directory,)) jobs.append(process_birb) jobs.append(process_input) if __name__ == "__main__": """ Entry point """ chartcreator = ChartCreator() chartcreator.doallruns()