diff --git a/app.py b/app.py index 19819f8..27af35f 100644 --- a/app.py +++ b/app.py @@ -10,6 +10,7 @@ import uuid import logging import sentry_sdk import time +from consumerinformation import ConsumerInformation from communicator import Communicator from consumerlocator import ConsumerLocator from messagesender import MessageSender @@ -42,11 +43,14 @@ if __name__ == "__main__": """ LOGGER.info("Producer started") generateduuid = str(uuid.uuid4()) - communicator = Communicator(currentconsumer=KNOWNCONSUMER, uuid=generateduuid) + redisconnector = RedisConnector() + consumerinfomation = ConsumerInformation(redisconnector=redisconnector) + communicator = Communicator(currentconsumer=KNOWNCONSUMER, uuid=generateduuid, + consumerinformation=consumerinfomation) LOGGER.debug(f"My uuid is {generateduuid}") messagesender = MessageSender(communicator=communicator, uuid=generateduuid) - consumerlocator = ConsumerLocator(uuid=generateduuid, communicator=communicator, - redisconnector=RedisConnector()) + consumerlocator = ConsumerLocator(communicator=communicator, + redisconnector=redisconnector) while True: consumerlocator.learnconsumerlist() diff --git a/communicator.py b/communicator.py index 1c5dc92..5468bcd 100644 --- a/communicator.py +++ b/communicator.py @@ -7,6 +7,7 @@ Communicator module import logging import requests import requests.exceptions +from consumerinformation import ConsumerInformation __author__ = "@tormakris" __copyright__ = "Copyright 2020, GoldenPogácsa Team" @@ -22,15 +23,16 @@ class Communicator: Class handling low level communication with consumers. """ - def __init__(self, currentconsumer: str, uuid: str): + def __init__(self, currentconsumer: str, uuid: str, consumerinformation: ConsumerInformation): """**Constructor:** Initializes the object. - :param consumerlocator: the current consumer's IP address as a string + :param currentconsumer: the current consumer's IP address as a string :param uuid: string typed UUID. """ - self.currenctconsumer=currentconsumer + self.currenctconsumer = currentconsumer self.uuid = uuid + self.consumerinformation = consumerinformation def sendmessage(self, message: str) -> None: """Send message to the current consumer. Logs the process. @@ -38,13 +40,18 @@ class Communicator: :param message: the message of type string that will be sent. :return: None """ - currentconsumer=self.currenctconsumer + currentconsumer = self.currenctconsumer LOGGER.info(f"Sending message to {currentconsumer}") - try: - postresponse=requests.post(f'http://{currentconsumer}/log', json={'uuid': self.uuid, 'message': message}, timeout=5) - LOGGER.debug(f"Message status code is:{postresponse.status_code}") - except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: - LOGGER.exception(e) # Fun fact: ez azt jelenti, hogy elveszett az üzenet... ide valami retry kellene inkább más consumerek felé... + + for consumer in self.consumerinformation.getconsumerlist(): + try: + postresponse = requests.post(f'http://{consumer}/log', json={'uuid': self.uuid, 'message': message}, + timeout=5) + LOGGER.debug(f"Message status code is:{postresponse.status_code}") + if postresponse.status_code < 300: + return None + except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: + LOGGER.warning(f"Could not send message to {consumer}") def discoveravailableconsumers(self) -> list: """Get the list of available consumer from the current primary consumer. Logs the received list. @@ -58,8 +65,8 @@ class Communicator: LOGGER.info(f"List of currently available consumers: {json}") return json except Exception as e: - LOGGER.error("Could not query available consumer list!") - #LOGGER.exception(e) + LOGGER.warning("Could not query available consumer list!") + # LOGGER.exception(e) return [] def isconsumeravailable(self) -> bool: @@ -72,7 +79,7 @@ class Communicator: response = requests.get(f'http://{currentconsumer}/consumers', timeout=5) isavailable = response.status_code == 200 except Exception as e: - #LOGGER.exception(e) + # LOGGER.exception(e) isavailable = False LOGGER.info(f"Current consumer availability: {isavailable}") return isavailable @@ -87,15 +94,15 @@ class Communicator: response = requests.get(f'http://{consumer}/consumers', timeout=5) isavailable = response.status_code == 200 except Exception as e: - #LOGGER.exception(e) + # LOGGER.exception(e) isavailable = False LOGGER.info(f"Consumer {consumer} availability: {isavailable}") return isavailable - def set_currentconsumer(self,currenctconsumer) -> None: + def set_currentconsumer(self, currenctconsumer) -> None: """Set current consumer :param currenctconsumer: the consumer's IP address :return: None """ - self.currenctconsumer=currenctconsumer + self.currenctconsumer = currenctconsumer diff --git a/consumerinformation.py b/consumerinformation.py new file mode 100644 index 0000000..e6e69e7 --- /dev/null +++ b/consumerinformation.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python + +""" +Consumer locator module, that manages the list of consumers. +""" + +from redisconnector import RedisConnector + +__author__ = "@ricsik52" +__copyright__ = "Copyright 2020, GoldenPogácsa Team" +__module_name__ = "consumerlocator" +__version__text__ = "1" + + +class ConsumerInformation: + """ + Component responsible for providing high level information about consumers + """ + + def __init__(self, redisconnector: RedisConnector): + """**Constructor:** + Initializes the object. + + :param redisconnector: the :class:'redisconnector.RedisConnector' instance that will be used for Redis connection. + """ + self.red = redisconnector + + def getconsumerlist(self) -> list: + """ + Gets the list of currently available consumers. + :return: + """ + toreturn = [] + for consumer in self.red.consumerlist: + if consumer['State']: + toreturn.append(consumer['Host']) + return toreturn diff --git a/consumerlocator.py b/consumerlocator.py index b741848..1a940b5 100644 --- a/consumerlocator.py +++ b/consumerlocator.py @@ -20,18 +20,16 @@ LOGGER = logging.getLogger(__name__) class ConsumerLocator: - """ Component responsible for managing the list of consumers. Requires an instance of :class:`communicator.Communicator` """ - def __init__(self, uuid: str, communicator: Communicator, redisconnector: RedisConnector): + def __init__(self, communicator: Communicator, redisconnector: RedisConnector): """**Constructor:** Initializes the object. Gets the known consumer's IP address from the PRODUCER_KNOWNCONSUMER envar. - :param uuid: Not used :param communicator: the :class:'communicator.Communicator' instance that will be used for the low level communication. """ self.red = redisconnector @@ -51,7 +49,6 @@ class ConsumerLocator: if not recievedconsumerlist: return - consumer_list = self.red.consumerlist for recconsumer in recievedconsumerlist: @@ -62,7 +59,8 @@ class ConsumerLocator: if not contains: LOGGER.info(f"Learned about new consumer at {recconsumer}") - consumer_list.append({"Host": recconsumer, "State": True, "LastOk": datetime.datetime.now().timestamp()}) + consumer_list.append( + {"Host": recconsumer, "State": True, "LastOk": datetime.datetime.now().timestamp()}) self.red.consumerlist = consumer_list self.updateconsumerlist() @@ -81,7 +79,8 @@ class ConsumerLocator: for consumer in consumer_list: if not self.communicator.checkconsumer(consumer["Host"]): consumer["State"] = False - if datetime.datetime.now() - datetime.datetime.fromtimestamp(consumer["LastOk"]) > datetime.timedelta(hours=1): + if datetime.datetime.now() - datetime.datetime.fromtimestamp(consumer["LastOk"]) > datetime.timedelta( + hours=1): removelist.append(consumer) else: consumer["LastOk"] = datetime.datetime.now().timestamp() diff --git a/integtest.py b/integtest.py index cac20ad..c82b762 100644 --- a/integtest.py +++ b/integtest.py @@ -4,9 +4,8 @@ Integration test for the producer module. """ -import os import re -import datetime +from consumerinformation import ConsumerInformation from communicator import Communicator from consumerlocator import ConsumerLocator from messagesender import MessageSender @@ -23,11 +22,14 @@ redis_proc = factories.redis_proc(host='cache', port=6379) redis_db = factories.redisdb('redis_nooproc') answer = "" + + def answermethod(Request): global answer answer = Request return "" + def test_integration(httpserver): """ Tests the whole system. @@ -45,12 +47,14 @@ def test_integration(httpserver): method='POST').respond_with_handler(answermethod) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) + redisconnector = RedisConnector() + consumerinfomation = ConsumerInformation(redisconnector=redisconnector) comm = Communicator( currentconsumer=f"127.0.0.1:{port}", - uuid=generateduuid) + uuid=generateduuid, consumerinformation=consumerinfomation) messagesender = MessageSender(communicator=comm, uuid=generateduuid) - consumerlocator = ConsumerLocator(uuid=generateduuid, communicator=comm, - redisconnector=RedisConnector()) + consumerlocator = ConsumerLocator(communicator=comm, + redisconnector=redisconnector) #First test method consumerlocator.learnconsumerlist() conslist = consumerlocator.red.consumerlist diff --git a/test.py b/test.py index 20ad8c5..89cfa40 100644 --- a/test.py +++ b/test.py @@ -9,6 +9,7 @@ import consumerlocator import communicator import messagesender import redisconnector +import consumerinformation from pytest_redis import factories __author__ = "@tormakris" @@ -28,9 +29,11 @@ def test_generate_string(mocker): :param mocker: patches the :class:`communicator.Communicator`. """ mocker.patch('communicator.Communicator') + redisconnect = redisconnector.RedisConnector() + consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect) comm = communicator.Communicator( currentconsumer="localhost", - uuid=generateduuid) + uuid=generateduuid, consumerinformation=consumerinfo) mess = messagesender.MessageSender(communicator=comm, uuid=generateduuid) msg = mess.randomstring(stringlength=32) assert isinstance(msg, str) @@ -52,9 +55,13 @@ def test_sendmessage(httpserver): "test": "ok"}) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) + redisconnect = redisconnector.RedisConnector() + consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect) + redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"} + redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}] comm = communicator.Communicator( currentconsumer=f"127.0.0.1:{port}", - uuid=generateduuid) + uuid=generateduuid, consumerinformation=consumerinfo) mess = "SENDING" ret = comm.sendmessage(message=mess) assert ret is None @@ -68,9 +75,13 @@ def test_send_message(mocker): :return: None """ mocker.patch('communicator.Communicator') + redisconnect = redisconnector.RedisConnector() + consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect) + redisconnect.currentconsumer = {"Host": f"127.0.0.1", "State": True, "LastOk": "1589479202"} + redisconnect.consumerlist = [{"Host": f"127.0.0.1", "State": True, "LastOk": "1589479202"}] comm = communicator.Communicator( currentconsumer="127.0.0.1", - uuid=generateduuid) + uuid=generateduuid, consumerinformation=consumerinfo) mess = messagesender.MessageSender(communicator=comm, uuid=generateduuid) messa = "SENDING" msg = mess.sendmessage(message=messa) @@ -91,9 +102,13 @@ def test_discoveravailableconsumers(httpserver): ["10.69.42.1", "10.10.10.10", "10.20.30.40"]) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) + redisconnect = redisconnector.RedisConnector() + consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect) + redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"} + redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}] comm = communicator.Communicator( currentconsumer=f"127.0.0.1:{port}", - uuid=generateduuid) + uuid=generateduuid, consumerinformation=consumerinfo) ret = comm.discoveravailableconsumers() assert isinstance(ret, list) assert ret == ["10.69.42.1", "10.10.10.10", "10.20.30.40"] @@ -113,9 +128,13 @@ def test_isconsumeravailable(httpserver): ["10.69.42.1", "10.10.10.10", "10.20.30.40"]) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) + redisconnect = redisconnector.RedisConnector() + consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect) + redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"} + redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}] comm = communicator.Communicator( currentconsumer=f"127.0.0.1:{port}", - uuid=generateduuid) + uuid=generateduuid, consumerinformation=consumerinfo) ret = comm.isconsumeravailable() assert isinstance(ret, bool) assert ret @@ -126,7 +145,7 @@ def test_isconsumeravailable(httpserver): comm2 = communicator.Communicator( currentconsumer="127.0.0.1:69", - uuid=generateduuid) + uuid=generateduuid, consumerinformation=consumerinfo) ret3 = comm2.isconsumeravailable() assert isinstance(ret3, bool) @@ -147,9 +166,13 @@ def test_checkconsumer(httpserver): ["10.69.42.1", "10.10.10.10", "10.20.30.40"]) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) + redisconnect = redisconnector.RedisConnector() + consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect) + redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"} + redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}] comm = communicator.Communicator( currentconsumer="127.0.0.1", - uuid=generateduuid) + uuid=generateduuid, consumerinformation=consumerinfo) ret = comm.checkconsumer(f"127.0.0.1:{port}") assert isinstance(ret, bool) assert ret @@ -160,8 +183,7 @@ def test_checkconsumer(httpserver): comm2 = communicator.Communicator( currentconsumer="127.0.0.1", - uuid=generateduuid) - + uuid=generateduuid, consumerinformation=consumerinfo) ret3 = comm2.checkconsumer(f"127.0.0.1:{port}") assert isinstance(ret3, bool) assert ret3 == False @@ -173,9 +195,11 @@ def test_setcurrentconsumer(): :return: None """ + redisconnect = redisconnector.RedisConnector() + consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect) comm = communicator.Communicator( currentconsumer="127.0.0.1", - uuid=generateduuid) + uuid=generateduuid, consumerinformation=consumerinfo) comm.set_currentconsumer("10.69.42.1") assert comm.currenctconsumer == "10.69.42.1" @@ -194,12 +218,16 @@ def test_learnconsumerlist(httpserver): ["10.69.42.1", "10.10.10.10", "10.20.30.40"]) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) + redisconnect = redisconnector.RedisConnector() + consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect) + redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"} + redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}] comm = communicator.Communicator( currentconsumer=f"127.0.0.1:{port}", - uuid=generateduuid) + uuid=generateduuid, consumerinformation=consumerinfo) consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}" locator = consumerlocator.ConsumerLocator( - uuid=generateduuid, communicator=comm, + communicator=comm, redisconnector=redisconnector.RedisConnector()) ret = locator.learnconsumerlist() assert ret is None @@ -213,11 +241,13 @@ def test_getcurrentconsumer(mocker): :return: None """ mocker.patch('communicator.Communicator') + redisconnect = redisconnector.RedisConnector() + consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect) comm = communicator.Communicator( currentconsumer="127.0.0.1", - uuid=generateduuid) + uuid=generateduuid, consumerinformation=consumerinfo) locator = consumerlocator.ConsumerLocator( - uuid=generateduuid, communicator=comm, + communicator=comm, redisconnector=redisconnector.RedisConnector()) assert locator.getcurrentconsumer() == consumerlocator.KNOWNCONSUMER @@ -236,13 +266,17 @@ def test_checkcurrentconsumer(httpserver): ["10.69.42.1", "10.10.10.10", "10.20.30.40"]) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) + redisconnect = redisconnector.RedisConnector() + consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect) comm = communicator.Communicator( currentconsumer=f"127.0.0.1:{port}", - uuid=generateduuid) + uuid=generateduuid, consumerinformation=consumerinfo) consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}" locator = consumerlocator.ConsumerLocator( - uuid=generateduuid, communicator=comm, + communicator=comm, redisconnector=redisconnector.RedisConnector()) + redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"} + redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}] ret = locator.checkcurrentconsumer() assert ret == True @@ -261,14 +295,18 @@ def test_updateconsumer(httpserver): ["10.69.42.1", "10.10.10.10", "10.20.30.40"]) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) + redisconnect = redisconnector.RedisConnector() + consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect) comm = communicator.Communicator( currentconsumer=f"127.0.0.1:{port}", - uuid=generateduuid) + uuid=generateduuid, consumerinformation=consumerinfo) consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}" redisconn = redisconnector.RedisConnector() locator = consumerlocator.ConsumerLocator( - uuid=generateduuid, communicator=comm, + communicator=comm, redisconnector=redisconn) + redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"} + redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}] assert redisconn.currentconsumer is not None ret = locator.updateconsumer() assert ret == f"127.0.0.1:{port}" @@ -288,12 +326,16 @@ def test_updateconsumerlist(httpserver): ["10.69.42.1", "10.10.10.10", "10.20.30.40"]) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) + redisconnect = redisconnector.RedisConnector() + consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect) comm = communicator.Communicator( currentconsumer=f"127.0.0.1:{port}", - uuid=generateduuid) + uuid=generateduuid, consumerinformation=consumerinfo) consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}" locator = consumerlocator.ConsumerLocator( - uuid=generateduuid, communicator=comm, + communicator=comm, redisconnector=redisconnector.RedisConnector()) + redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"} + redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}] ret = locator.updateconsumerlist() assert ret is None