diff --git a/.drone.yml b/.drone.yml index abe2614..d5ba8fb 100644 --- a/.drone.yml +++ b/.drone.yml @@ -15,26 +15,24 @@ steps: - name: unit_test image: python:3.8 + environment: + PRODUCER_REDIS: cache commands: - pip3 install -r requirements.txt - pytest test.py -- name: build - image: docker:stable-dind - volumes: - - name: dockersock - path: /var/run - environment: - DOCKER_USERNAME: - from_secret: DOCKER_USERNAME - DOCKER_PASSWORD: - from_secret: DOCKER_PASSWORD - commands: - - echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin - - docker build -t="$DOCKER_USERNAME/producer" . - - docker build -t="$DOCKER_USERNAME/producer:$DRONE_BUILD_NUMBER" . - - docker push "$DOCKER_USERNAME/producer" - - docker push "$DOCKER_USERNAME/producer:$DRONE_BUILD_NUMBER" +- name: build-app + image: banzaicloud/drone-kaniko + settings: + registry: registry.kmlabz.com + repo: goldenpogacsa/${DRONE_REPO_NAME} + username: + from_secret: DOCKER_USERNAME + password: + from_secret: DOCKER_PASSWORD + tags: + - latest + - ${DRONE_BUILD_NUMBER} - name: make_docs image: python:3.8 @@ -56,11 +54,11 @@ steps: from_secret: DOCKER_PASSWORD commands: - cd docs - - echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin - - docker build -t="$DOCKER_USERNAME/producer-docs" . - - docker build -t="$DOCKER_USERNAME/producer-docs:$DRONE_BUILD_NUMBER" . - - docker push "$DOCKER_USERNAME/producer-docs" - - docker push "$DOCKER_USERNAME/producer-docs:$DRONE_BUILD_NUMBER" + - echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin registry.kmlabz.com + - docker build -t="registry.kmlabz.com/goldenpogacsa/producer-docs" . + - docker build -t="registry.kmlabz.com/goldenpogacsa/producer-docs:$DRONE_BUILD_NUMBER" . + - docker push "registry.kmlabz.com/goldenpogacsa/producer-docs" + - docker push "registry.kmlabz.com/goldenpogacsa/producer-docs:$DRONE_BUILD_NUMBER" - name: slack image: plugins/slack @@ -79,6 +77,8 @@ services: volumes: - name: dockersock path: /var/run +- name: cache + image: redis volumes: - name: dockersock diff --git a/README.rst b/README.rst index bd2e049..39febce 100644 --- a/README.rst +++ b/README.rst @@ -2,6 +2,13 @@ P2P Producer ============ +This repository contains the Producer part of the project. The module manages the list of consumers and +sends data to the currently active one. If that becomes unavailable, it chooses an other one to the data to. + +Implementation is done in python, the code is put into Docker images (as the consumers). To run the full project +clone the main repository and run +*docker-compose up.* + Produced by GoldenPogácsa Inc. TODO diff --git a/app.py b/app.py index 597ea13..fd50c82 100644 --- a/app.py +++ b/app.py @@ -43,9 +43,9 @@ if __name__ == "__main__": messagesender = MessageSender(communicator=communicator, uuid=generateduuid) consumerlocator = ConsumerLocator(uuid=generateduuid, communicator=communicator, redisconnector=RedisConnector()) - consumerlocator.learnconsumerlist() while True: + consumerlocator.learnconsumerlist() LOGGER.info(f"Updating consumer list of {generateduuid}") consumerlocator.updateconsumer() LOGGER.info("Sending message to consumer") diff --git a/communicator.py b/communicator.py index c3f8298..1c5dc92 100644 --- a/communicator.py +++ b/communicator.py @@ -6,6 +6,7 @@ Communicator module import logging import requests +import requests.exceptions __author__ = "@tormakris" __copyright__ = "Copyright 2020, GoldenPogácsa Team" @@ -15,6 +16,7 @@ __version__text__ = "1" logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) + class Communicator: """ Class handling low level communication with consumers. @@ -38,8 +40,11 @@ class Communicator: """ currentconsumer=self.currenctconsumer LOGGER.info(f"Sending message to {currentconsumer}") - postresponse=requests.post(f'http://{currentconsumer}/log', json={'uuid': self.uuid, 'message': message}) - LOGGER.debug(f"Message status code is:{postresponse.status_code}") + try: + postresponse=requests.post(f'http://{currentconsumer}/log', json={'uuid': self.uuid, 'message': message}, timeout=5) + LOGGER.debug(f"Message status code is:{postresponse.status_code}") + except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: + LOGGER.exception(e) # Fun fact: ez azt jelenti, hogy elveszett az üzenet... ide valami retry kellene inkább más consumerek felé... def discoveravailableconsumers(self) -> list: """Get the list of available consumer from the current primary consumer. Logs the received list. @@ -48,12 +53,13 @@ class Communicator: """ try: currentconsumer = self.currenctconsumer - response = requests.get(f'http://{currentconsumer}/consumers') + response = requests.get(f'http://{currentconsumer}/consumers', timeout=5) json = response.json() LOGGER.info(f"List of currently available consumers: {json}") return json except Exception as e: - LOGGER.exception(e) + LOGGER.error("Could not query available consumer list!") + #LOGGER.exception(e) return [] def isconsumeravailable(self) -> bool: @@ -63,10 +69,10 @@ class Communicator: """ currentconsumer = self.currenctconsumer try: - response = requests.get(f'http://{currentconsumer}/consumers') + response = requests.get(f'http://{currentconsumer}/consumers', timeout=5) isavailable = response.status_code == 200 except Exception as e: - LOGGER.exception(e) + #LOGGER.exception(e) isavailable = False LOGGER.info(f"Current consumer availability: {isavailable}") return isavailable @@ -78,10 +84,10 @@ class Communicator: :return: True if available, False otherwise """ try: - response = requests.get(f'http://{consumer}/consumers') + response = requests.get(f'http://{consumer}/consumers', timeout=5) isavailable = response.status_code == 200 except Exception as e: - LOGGER.exception(e) + #LOGGER.exception(e) isavailable = False LOGGER.info(f"Consumer {consumer} availability: {isavailable}") return isavailable diff --git a/consumerlocator.py b/consumerlocator.py index 4c727e5..57f0c44 100644 --- a/consumerlocator.py +++ b/consumerlocator.py @@ -8,6 +8,7 @@ import datetime from communicator import Communicator import os from redisconnector import RedisConnector +import logging __author__ = "@dscharnitzky" __copyright__ = "Copyright 2020, GoldenPogácsa Team" @@ -15,6 +16,7 @@ __module_name__ = "consumerlocator" __version__text__ = "1" KNOWNCONSUMER = os.getenv("PRODUCER_KNOWNCONSUMER", '10.69.42.1') +LOGGER = logging.getLogger(__name__) class ConsumerLocator: @@ -33,7 +35,7 @@ class ConsumerLocator: :param communicator: the :class:'communicator.Communicator' instance that will be used for the low level communication. """ self.red = redisconnector - self.red.consumerlist = [{"Host": KNOWNCONSUMER, "State": True, "LastOk": datetime.datetime.now()}] + self.red.consumerlist = [{"Host": KNOWNCONSUMER, "State": True, "LastOk": datetime.datetime.now().timestamp()}] self.red.currentconsumer = self.red.consumerlist[0] self.communicator = communicator @@ -46,8 +48,12 @@ class ConsumerLocator: :returns: None """ recievedconsumerlist = self.communicator.discoveravailableconsumers() - if recievedconsumerlist is None: + if not recievedconsumerlist: return + + + consumer_list = self.red.consumerlist + for recconsumer in recievedconsumerlist: contains = False for consumer in self.red.consumerlist: @@ -55,8 +61,10 @@ class ConsumerLocator: contains = True if not contains: - self.red.consumerlist.append({"Host": recconsumer, "State": True, "LastOk": datetime.datetime.now()}) + LOGGER.info(f"Learned about new consumer at {recconsumer}") + consumer_list.append({"Host": recconsumer, "State": True, "LastOk": datetime.datetime.now().timestamp()}) + self.red.consumerlist = consumer_list self.updateconsumerlist() def updateconsumerlist(self) -> None: @@ -68,16 +76,20 @@ class ConsumerLocator: :return: None """ removelist = [] - for consumer in self.red.consumerlist: + consumer_list = self.red.consumerlist + + for consumer in consumer_list: if not self.communicator.checkconsumer(consumer["Host"]): consumer["State"] = False - if datetime.datetime.now() - consumer["LastOk"] > datetime.timedelta(hours=1): + if datetime.datetime.now() - datetime.datetime.fromtimestamp(consumer["LastOk"]) > datetime.timedelta(hours=1): removelist.append(consumer) else: - consumer["LastOk"] = datetime.datetime.now() + consumer["LastOk"] = datetime.datetime.now().timestamp() consumer["State"] = True for rem in removelist: - self.red.consumerlist.remove(rem) + consumer_list.remove(rem) + + self.red.consumerlist = consumer_list def updateconsumer(self): """If the current consumer is not available, checks all the consumers in the list and updates the current one. @@ -100,6 +112,7 @@ class ConsumerLocator: self.red.currentconsumer = newcurrentconsumer if self.red.currentconsumer is not None: + LOGGER.warning(f"Falling back to consumer at {newcurrentconsumer['Host']}") self.learnconsumerlist() if self.red.currentconsumer is not None: diff --git a/redisconnector.py b/redisconnector.py index 30dd958..05d731e 100644 --- a/redisconnector.py +++ b/redisconnector.py @@ -48,7 +48,7 @@ class RedisConnector: Gets currently active consumer. :return: """ - return self.redisconnection.get('currentConsumer') + return json.loads(self.redisconnection.get('currentConsumer')) def set_currentconsumer(self, currentconsumer): """ @@ -56,7 +56,8 @@ class RedisConnector: :param currentconsumer: :return: """ - self.redisconnection.set('currentConsumer', currentconsumer) + json_dict = json.dumps(currentconsumer) + self.redisconnection.set('currentConsumer', json_dict) consumerlist = property(get_consumerlist, set_consumerlist) currentconsumer = property(get_currentconsumer, set_currentconsumer) diff --git a/requirements.txt b/requirements.txt index a714251..0b7e163 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ requests pytest pytest-mock pytest-httpserver +pytest-redis redis \ No newline at end of file diff --git a/test.py b/test.py index 0b46052..20ad8c5 100644 --- a/test.py +++ b/test.py @@ -9,7 +9,7 @@ import consumerlocator import communicator import messagesender import redisconnector - +from pytest_redis import factories __author__ = "@tormakris" __copyright__ = "Copyright 2020, GoldenPogácsa Team" @@ -17,6 +17,8 @@ __module_name__ = "test" __version__text__ = "1" generateduuid = 'c959ad81-58f9-4445-aab4-8f3d68aee1ad' +redis_proc = factories.redis_proc(host='cache', port=6379) +redis_db = factories.redisdb('redis_nooproc') def test_generate_string(mocker): @@ -263,10 +265,11 @@ def test_updateconsumer(httpserver): currentconsumer=f"127.0.0.1:{port}", uuid=generateduuid) consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}" + redisconn = redisconnector.RedisConnector() locator = consumerlocator.ConsumerLocator( uuid=generateduuid, communicator=comm, - redisconnector=redisconnector.RedisConnector()) - assert locator.currentconsumer is not None + redisconnector=redisconn) + assert redisconn.currentconsumer is not None ret = locator.updateconsumer() assert ret == f"127.0.0.1:{port}"