Compare commits

..

45 Commits

Author SHA1 Message Date
489883b464 lowered giveup time
All checks were successful
continuous-integration/drone/push Build is passing
2020-05-14 23:02:52 +02:00
f3e8750ff1 pytest is missing for some reason
All checks were successful
continuous-integration/drone/push Build is passing
2020-05-14 20:50:22 +02:00
c50cd6b23a install requirements in coverage step
All checks were successful
continuous-integration/drone/push Build is passing
2020-05-14 20:48:47 +02:00
32f104df7c coverage
Some checks failed
continuous-integration/drone/push Build is failing
2020-05-14 20:46:34 +02:00
957227b715 Merge branch 'master' of gitea:GoldenPogacsa/producer
All checks were successful
continuous-integration/drone/push Build is passing
2020-05-14 20:09:49 +02:00
6bdcb40220 add consumerinformation 2020-05-14 20:09:41 +02:00
0ab07cb1a2 Update 'README.rst'
All checks were successful
continuous-integration/drone/push Build is passing
2020-05-14 13:43:54 +02:00
20a1aa6dfa Merge remote-tracking branch 'origin/master'
All checks were successful
continuous-integration/drone/push Build is passing
2020-05-13 12:34:46 +02:00
f782d167ef Updated app doc 2020-05-13 12:34:31 +02:00
d82cc3910b use cache at unit_test step
All checks were successful
continuous-integration/drone/push Build is passing
2020-05-08 23:34:06 +02:00
ea7982e0ef fix ci cache
Some checks reported errors
continuous-integration/drone/push Build was killed
2020-05-08 22:22:44 +02:00
926ed41a0f Merge remote-tracking branch 'origin/master'
All checks were successful
continuous-integration/drone/push Build is passing
2020-05-08 20:57:46 +02:00
f015eb2301 Created integration test 2020-05-08 20:57:23 +02:00
4610c7a42f Fixed exist test in consumerlocator 2020-05-08 20:32:32 +02:00
112a7fc8c2 Merge pull request 'dev-ci' (#8) from dev-ci into master
All checks were successful
continuous-integration/drone/push Build is passing
2020-05-08 19:34:11 +02:00
3d4274db86 add cache volume
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2020-05-08 19:32:13 +02:00
02e3dc5267 add cache to ci
Some checks failed
continuous-integration/drone/push Build is failing
2020-05-08 19:25:42 +02:00
f898af4558 Init integration test
All checks were successful
continuous-integration/drone/push Build is passing
2020-05-08 19:21:38 +02:00
2cb6ef3a6c Added integration test skeleton
All checks were successful
continuous-integration/drone/push Build is passing
2020-05-08 19:13:00 +02:00
c5a3fb2d04 fix unit tests
All checks were successful
continuous-integration/drone/push Build is passing
2020-04-29 12:31:33 +02:00
22ebf01e20 fix some tests 2020-04-29 12:16:27 +02:00
58c4e296ea use custom dind to build doc container image
All checks were successful
continuous-integration/drone/push Build is passing
2020-04-28 20:53:51 +02:00
d0cdb61916 dockerfile keyword is still needed
Some checks reported errors
continuous-integration/drone/push Build was killed
2020-04-28 20:30:40 +02:00
e8f206728f change context of document building
Some checks failed
continuous-integration/drone/push Build is failing
2020-04-28 20:26:41 +02:00
be8873956b change directory before kaniko step
Some checks failed
continuous-integration/drone/push Build is failing
2020-04-28 20:13:55 +02:00
790cfa3a91 use kaniko to build container images
Some checks failed
continuous-integration/drone/push Build is failing
2020-04-28 19:37:04 +02:00
8a5f9efd08 Added logging about failover
All checks were successful
continuous-integration/drone/push Build is passing
2020-04-22 03:59:34 +02:00
49d686a4ec Fixed stuff probably not being saved 2020-04-22 03:55:33 +02:00
62d4bc48e8 This never returns with None 2020-04-22 03:43:20 +02:00
00300f5b2f If something expected happens than it should not be logged as ERROR 2020-04-22 03:41:35 +02:00
ad44b4f134 Moved learning about new consumers to every cycle
All checks were successful
continuous-integration/drone/push Build is passing
2020-04-22 03:16:28 +02:00
d3656b543f Added timeout for requests
All checks were successful
continuous-integration/drone/push Build is passing
2020-04-22 01:48:29 +02:00
448acd4dd0 dont test
All checks were successful
continuous-integration/drone/push Build is passing
2020-04-21 22:46:49 +02:00
844341fdd0 Fixed test_updateconsumer
Some checks failed
continuous-integration/drone/push Build is failing
2020-04-21 20:30:56 +02:00
010e0d708f Fixed redisconnector's currentconsumer handling 2020-04-21 20:30:03 +02:00
de4c2ca6fb Merge remote-tracking branch 'origin/master'
Some checks failed
continuous-integration/drone/push Build is failing
2020-04-21 19:53:12 +02:00
20e7f2fa05 Change datetime import back 2020-04-21 19:52:55 +02:00
d72f45b4c2 Update 'README.rst'
Some checks failed
continuous-integration/drone/push Build is failing
2020-04-21 19:50:55 +02:00
d69e97ca3b Change datetime import
Some checks failed
continuous-integration/drone/push Build is failing
2020-04-21 19:36:42 +02:00
ed6ed3baf0 Fixed datetime json export
Some checks reported errors
continuous-integration/drone/push Build encountered an error
2020-04-21 19:25:27 +02:00
7e9354be18 Added uuid to MessageSender in tests
Some checks failed
continuous-integration/drone/push Build is failing
2020-04-17 19:25:13 +02:00
f0822580f1 Remove redis_db from test arguments
Some checks reported errors
continuous-integration/drone/push Build encountered an error
2020-04-17 19:17:55 +02:00
215dbdbbf3 Fix redis mock is tests
Some checks failed
continuous-integration/drone/push Build is failing
2020-04-17 19:08:20 +02:00
e65d48b334 Mock redis is tests
Some checks failed
continuous-integration/drone/push Build is failing
2020-04-17 18:57:46 +02:00
2a95eb392a Mock redis is tests 2020-04-17 18:57:32 +02:00
12 changed files with 346 additions and 84 deletions

2
.coveragerc Normal file
View File

@ -0,0 +1,2 @@
[run]
omit=venv/*

View File

@ -3,11 +3,25 @@ type: docker
name: default name: default
steps: steps:
- name: restore-cache-with-filesystem
image: meltwater/drone-cache
settings:
backend: "filesystem"
restore: true
cache_key: "{{ .Repo.Name }}"
archive_format: "gzip"
filesystem_cache_root: "/tmp/cache"
mount:
- '.pipcache'
volumes:
- name: cache
path: /tmp/cache
- name: static_analysis - name: static_analysis
image: python:3.8 image: python:3.8
commands: commands:
- pip3 install pylint bandit mccabe - pip3 install --cache-dir='./.pipcache' pylint bandit mccabe
- pip3 install -r requirements.txt - pip3 install --cache-dir='./.pipcache' -r requirements.txt
- find . -name "*.py" -exec python3 -m py_compile '{}' \; - find . -name "*.py" -exec python3 -m py_compile '{}' \;
- find . -name "*.py" -exec pylint '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi - find . -name "*.py" -exec pylint '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi
- find . -name "*.py" -exec python3 -m mccabe --min 3 '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi - find . -name "*.py" -exec python3 -m mccabe --min 3 '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi
@ -15,35 +29,66 @@ steps:
- name: unit_test - name: unit_test
image: python:3.8 image: python:3.8
environment:
PRODUCER_REDIS: cache
commands: commands:
- pip3 install -r requirements.txt - pip3 install --cache-dir='./.pipcache' -r requirements.txt
- pytest test.py - pytest test.py
- name: build - name: coverage
image: docker:stable-dind image: python:3.8
volumes:
- name: dockersock
path: /var/run
environment: environment:
DOCKER_USERNAME: PRODUCER_REDIS: cache
from_secret: DOCKER_USERNAME
DOCKER_PASSWORD:
from_secret: DOCKER_PASSWORD
commands: commands:
- echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin - pip3 install --cache-dir='./.pipcache' -r requirements.txt
- docker build -t="$DOCKER_USERNAME/producer" . - pip3 install --cache-dir='./.pipcache' coverage pytest
- docker build -t="$DOCKER_USERNAME/producer:$DRONE_BUILD_NUMBER" . - coverage run -m pytest test.py
- docker push "$DOCKER_USERNAME/producer" - coverage report -m
- docker push "$DOCKER_USERNAME/producer:$DRONE_BUILD_NUMBER"
- name: integration_test
image: python:3.8
environment:
PRODUCER_REDIS: cache
commands:
- pip3 install --cache-dir='./.pipcache' -r requirements.txt
- pytest integtest.py
- name: build-app
image: banzaicloud/drone-kaniko
settings:
registry: registry.kmlabz.com
repo: goldenpogacsa/${DRONE_REPO_NAME}
username:
from_secret: DOCKER_USERNAME
password:
from_secret: DOCKER_PASSWORD
tags:
- latest
- ${DRONE_BUILD_NUMBER}
- name: make_docs - name: make_docs
image: python:3.8 image: python:3.8
commands: commands:
- pip3 install Sphinx sphinx_rtd_theme - pip3 install --cache-dir='./.pipcache' Sphinx sphinx_rtd_theme
- pip3 install -r requirements.txt - pip3 install --cache-dir='./.pipcache' -r requirements.txt
- cd docs - cd docs
- make html - make html
- name: rebuild-cache-with-filesystem
image: meltwater/drone-cache
pull: true
settings:
backend: "filesystem"
rebuild: true
cache_key: "{{ .Repo.Name }}"
archive_format: "gzip"
filesystem_cache_root: "/tmp/cache"
mount:
- '.pipcache'
volumes:
- name: cache
path: /tmp/cache
- name: build_docs - name: build_docs
image: docker:stable-dind image: docker:stable-dind
volumes: volumes:
@ -56,11 +101,11 @@ steps:
from_secret: DOCKER_PASSWORD from_secret: DOCKER_PASSWORD
commands: commands:
- cd docs - cd docs
- echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin - echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin registry.kmlabz.com
- docker build -t="$DOCKER_USERNAME/producer-docs" . - docker build -t="registry.kmlabz.com/goldenpogacsa/producer-docs" .
- docker build -t="$DOCKER_USERNAME/producer-docs:$DRONE_BUILD_NUMBER" . - docker build -t="registry.kmlabz.com/goldenpogacsa/producer-docs:$DRONE_BUILD_NUMBER" .
- docker push "$DOCKER_USERNAME/producer-docs" - docker push "registry.kmlabz.com/goldenpogacsa/producer-docs"
- docker push "$DOCKER_USERNAME/producer-docs:$DRONE_BUILD_NUMBER" - docker push "registry.kmlabz.com/goldenpogacsa/producer-docs:$DRONE_BUILD_NUMBER"
- name: slack - name: slack
image: plugins/slack image: plugins/slack
@ -79,7 +124,12 @@ services:
volumes: volumes:
- name: dockersock - name: dockersock
path: /var/run path: /var/run
- name: cache
image: redis
volumes: volumes:
- name: dockersock - name: dockersock
temp: {} temp: {}
- name: cache
host:
path: "/tmp/cache"

1
.gitignore vendored
View File

@ -131,3 +131,4 @@ dmypy.json
#Pycharm #Pycharm
.idea/ .idea/
*.iml *.iml
.coverage

View File

@ -2,6 +2,11 @@
P2P Producer P2P Producer
============ ============
Produced by GoldenPogácsa Inc. This repository contains the Producer part of the project. The module manages the list of consumers and
sends data to the currently active one. If that becomes unavailable, it chooses an other one to the data to.
TODO Implementation is done in python, the code is put into Docker images (as the consumers). To run the full project
clone the main repository and run
*docker-compose up.*
Produced by GoldenPogácsa Inc.

20
app.py
View File

@ -1,7 +1,7 @@
#!/usr/bin/env python #!/usr/bin/env python
""" """
Main entry point, This module builds the producer from the submodules. Main entry point, this module builds the producer from the submodules.
""" """
import os import os
@ -10,6 +10,7 @@ import uuid
import logging import logging
import sentry_sdk import sentry_sdk
import time import time
from consumerinformation import ConsumerInformation
from communicator import Communicator from communicator import Communicator
from consumerlocator import ConsumerLocator from consumerlocator import ConsumerLocator
from messagesender import MessageSender from messagesender import MessageSender
@ -34,18 +35,25 @@ if __name__ == "__main__":
""" """
This is the producers entry point, initializes all the components (:class:`communicator.Communicator`, This is the producers entry point, initializes all the components (:class:`communicator.Communicator`,
:class:`consumerlocator.ConsumerLocator` and :class:`messagesender.MessageSender`) and sends infinite random :class:`consumerlocator.ConsumerLocator` and :class:`messagesender.MessageSender`) and sends infinite random
messages. messages. Basically this is a big loop, learning about consumers ( :func:`consumerlocator.ConsumerLocator.learnconsumerlist` ),
updating the current ( :func:`consumerlocator.ConsumerLocator.updateconsumer` ) one and sending the message
( :func:`messagesender.MessageSender.sendmessage` ).
If the current consumer is unavailable, the update will change to an available one. To not flood the network with
infinite data, we some random time.
""" """
LOGGER.info("Producer started") LOGGER.info("Producer started")
generateduuid = str(uuid.uuid4()) generateduuid = str(uuid.uuid4())
communicator = Communicator(currentconsumer=KNOWNCONSUMER, uuid=generateduuid) redisconnector = RedisConnector()
consumerinfomation = ConsumerInformation(redisconnector=redisconnector)
communicator = Communicator(currentconsumer=KNOWNCONSUMER, uuid=generateduuid,
consumerinformation=consumerinfomation)
LOGGER.debug(f"My uuid is {generateduuid}") LOGGER.debug(f"My uuid is {generateduuid}")
messagesender = MessageSender(communicator=communicator, uuid=generateduuid) messagesender = MessageSender(communicator=communicator, uuid=generateduuid)
consumerlocator = ConsumerLocator(uuid=generateduuid, communicator=communicator, consumerlocator = ConsumerLocator(communicator=communicator,
redisconnector=RedisConnector()) redisconnector=redisconnector)
consumerlocator.learnconsumerlist()
while True: while True:
consumerlocator.learnconsumerlist()
LOGGER.info(f"Updating consumer list of {generateduuid}") LOGGER.info(f"Updating consumer list of {generateduuid}")
consumerlocator.updateconsumer() consumerlocator.updateconsumer()
LOGGER.info("Sending message to consumer") LOGGER.info("Sending message to consumer")

View File

@ -6,6 +6,8 @@ Communicator module
import logging import logging
import requests import requests
import requests.exceptions
from consumerinformation import ConsumerInformation
__author__ = "@tormakris" __author__ = "@tormakris"
__copyright__ = "Copyright 2020, GoldenPogácsa Team" __copyright__ = "Copyright 2020, GoldenPogácsa Team"
@ -15,20 +17,22 @@ __version__text__ = "1"
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class Communicator: class Communicator:
""" """
Class handling low level communication with consumers. Class handling low level communication with consumers.
""" """
def __init__(self, currentconsumer: str, uuid: str): def __init__(self, currentconsumer: str, uuid: str, consumerinformation: ConsumerInformation):
"""**Constructor:** """**Constructor:**
Initializes the object. Initializes the object.
:param consumerlocator: the current consumer's IP address as a string :param currentconsumer: the current consumer's IP address as a string
:param uuid: string typed UUID. :param uuid: string typed UUID.
""" """
self.currenctconsumer=currentconsumer self.currenctconsumer = currentconsumer
self.uuid = uuid self.uuid = uuid
self.consumerinformation = consumerinformation
def sendmessage(self, message: str) -> None: def sendmessage(self, message: str) -> None:
"""Send message to the current consumer. Logs the process. """Send message to the current consumer. Logs the process.
@ -36,10 +40,18 @@ class Communicator:
:param message: the message of type string that will be sent. :param message: the message of type string that will be sent.
:return: None :return: None
""" """
currentconsumer=self.currenctconsumer currentconsumer = self.currenctconsumer
LOGGER.info(f"Sending message to {currentconsumer}") LOGGER.info(f"Sending message to {currentconsumer}")
postresponse=requests.post(f'http://{currentconsumer}/log', json={'uuid': self.uuid, 'message': message})
for consumer in self.consumerinformation.getconsumerlist():
try:
postresponse = requests.post(f'http://{consumer}/log', json={'uuid': self.uuid, 'message': message},
timeout=5)
LOGGER.debug(f"Message status code is:{postresponse.status_code}") LOGGER.debug(f"Message status code is:{postresponse.status_code}")
if postresponse.status_code < 300:
return None
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
LOGGER.warning(f"Could not send message to {consumer}")
def discoveravailableconsumers(self) -> list: def discoveravailableconsumers(self) -> list:
"""Get the list of available consumer from the current primary consumer. Logs the received list. """Get the list of available consumer from the current primary consumer. Logs the received list.
@ -48,12 +60,13 @@ class Communicator:
""" """
try: try:
currentconsumer = self.currenctconsumer currentconsumer = self.currenctconsumer
response = requests.get(f'http://{currentconsumer}/consumers') response = requests.get(f'http://{currentconsumer}/consumers', timeout=5)
json = response.json() json = response.json()
LOGGER.info(f"List of currently available consumers: {json}") LOGGER.info(f"List of currently available consumers: {json}")
return json return json
except Exception as e: except Exception as e:
LOGGER.exception(e) LOGGER.warning("Could not query available consumer list!")
# LOGGER.exception(e)
return [] return []
def isconsumeravailable(self) -> bool: def isconsumeravailable(self) -> bool:
@ -63,10 +76,10 @@ class Communicator:
""" """
currentconsumer = self.currenctconsumer currentconsumer = self.currenctconsumer
try: try:
response = requests.get(f'http://{currentconsumer}/consumers') response = requests.get(f'http://{currentconsumer}/consumers', timeout=5)
isavailable = response.status_code == 200 isavailable = response.status_code == 200
except Exception as e: except Exception as e:
LOGGER.exception(e) # LOGGER.exception(e)
isavailable = False isavailable = False
LOGGER.info(f"Current consumer availability: {isavailable}") LOGGER.info(f"Current consumer availability: {isavailable}")
return isavailable return isavailable
@ -78,18 +91,18 @@ class Communicator:
:return: True if available, False otherwise :return: True if available, False otherwise
""" """
try: try:
response = requests.get(f'http://{consumer}/consumers') response = requests.get(f'http://{consumer}/consumers', timeout=5)
isavailable = response.status_code == 200 isavailable = response.status_code == 200
except Exception as e: except Exception as e:
LOGGER.exception(e) # LOGGER.exception(e)
isavailable = False isavailable = False
LOGGER.info(f"Consumer {consumer} availability: {isavailable}") LOGGER.info(f"Consumer {consumer} availability: {isavailable}")
return isavailable return isavailable
def set_currentconsumer(self,currenctconsumer) -> None: def set_currentconsumer(self, currenctconsumer) -> None:
"""Set current consumer """Set current consumer
:param currenctconsumer: the consumer's IP address :param currenctconsumer: the consumer's IP address
:return: None :return: None
""" """
self.currenctconsumer=currenctconsumer self.currenctconsumer = currenctconsumer

37
consumerinformation.py Normal file
View File

@ -0,0 +1,37 @@
#!/usr/bin/env python
"""
Consumer locator module, that manages the list of consumers.
"""
from redisconnector import RedisConnector
__author__ = "@ricsik52"
__copyright__ = "Copyright 2020, GoldenPogácsa Team"
__module_name__ = "consumerlocator"
__version__text__ = "1"
class ConsumerInformation:
"""
Component responsible for providing high level information about consumers
"""
def __init__(self, redisconnector: RedisConnector):
"""**Constructor:**
Initializes the object.
:param redisconnector: the :class:'redisconnector.RedisConnector' instance that will be used for Redis connection.
"""
self.red = redisconnector
def getconsumerlist(self) -> list:
"""
Gets the list of currently available consumers.
:return:
"""
toreturn = []
for consumer in self.red.consumerlist:
if consumer['State']:
toreturn.append(consumer['Host'])
return toreturn

View File

@ -8,6 +8,7 @@ import datetime
from communicator import Communicator from communicator import Communicator
import os import os
from redisconnector import RedisConnector from redisconnector import RedisConnector
import logging
__author__ = "@dscharnitzky" __author__ = "@dscharnitzky"
__copyright__ = "Copyright 2020, GoldenPogácsa Team" __copyright__ = "Copyright 2020, GoldenPogácsa Team"
@ -15,25 +16,24 @@ __module_name__ = "consumerlocator"
__version__text__ = "1" __version__text__ = "1"
KNOWNCONSUMER = os.getenv("PRODUCER_KNOWNCONSUMER", '10.69.42.1') KNOWNCONSUMER = os.getenv("PRODUCER_KNOWNCONSUMER", '10.69.42.1')
LOGGER = logging.getLogger(__name__)
class ConsumerLocator: class ConsumerLocator:
""" """
Component responsible for managing the list of consumers. Requires an instance of :class:`communicator.Communicator` Component responsible for managing the list of consumers. Requires an instance of :class:`communicator.Communicator`
""" """
def __init__(self, uuid: str, communicator: Communicator, redisconnector: RedisConnector): def __init__(self, communicator: Communicator, redisconnector: RedisConnector):
"""**Constructor:** """**Constructor:**
Initializes the object. Initializes the object.
Gets the known consumer's IP address from the PRODUCER_KNOWNCONSUMER envar. Gets the known consumer's IP address from the PRODUCER_KNOWNCONSUMER envar.
:param uuid: Not used
:param communicator: the :class:'communicator.Communicator' instance that will be used for the low level communication. :param communicator: the :class:'communicator.Communicator' instance that will be used for the low level communication.
""" """
self.red = redisconnector self.red = redisconnector
self.red.consumerlist = [{"Host": KNOWNCONSUMER, "State": True, "LastOk": datetime.datetime.now()}] self.red.consumerlist = [{"Host": KNOWNCONSUMER, "State": True, "LastOk": datetime.datetime.now().timestamp()}]
self.red.currentconsumer = self.red.consumerlist[0] self.red.currentconsumer = self.red.consumerlist[0]
self.communicator = communicator self.communicator = communicator
@ -46,17 +46,23 @@ class ConsumerLocator:
:returns: None :returns: None
""" """
recievedconsumerlist = self.communicator.discoveravailableconsumers() recievedconsumerlist = self.communicator.discoveravailableconsumers()
if recievedconsumerlist is None: if not recievedconsumerlist:
return return
consumer_list = self.red.consumerlist
for recconsumer in recievedconsumerlist: for recconsumer in recievedconsumerlist:
contains = False contains = False
for consumer in self.red.consumerlist: for consumer in consumer_list:
if consumer["Host"] == recconsumer: if consumer["Host"] == recconsumer:
contains = True contains = True
if not contains: if not contains:
self.red.consumerlist.append({"Host": recconsumer, "State": True, "LastOk": datetime.datetime.now()}) LOGGER.info(f"Learned about new consumer at {recconsumer}")
consumer_list.append(
{"Host": recconsumer, "State": True, "LastOk": datetime.datetime.now().timestamp()})
self.red.consumerlist = consumer_list
self.updateconsumerlist() self.updateconsumerlist()
def updateconsumerlist(self) -> None: def updateconsumerlist(self) -> None:
@ -68,16 +74,21 @@ class ConsumerLocator:
:return: None :return: None
""" """
removelist = [] removelist = []
for consumer in self.red.consumerlist: consumer_list = self.red.consumerlist
for consumer in consumer_list:
if not self.communicator.checkconsumer(consumer["Host"]): if not self.communicator.checkconsumer(consumer["Host"]):
consumer["State"] = False consumer["State"] = False
if datetime.datetime.now() - consumer["LastOk"] > datetime.timedelta(hours=1): if datetime.datetime.now() - datetime.datetime.fromtimestamp(consumer["LastOk"]) > datetime.timedelta(
seconds=15):
removelist.append(consumer) removelist.append(consumer)
else: else:
consumer["LastOk"] = datetime.datetime.now() consumer["LastOk"] = datetime.datetime.now().timestamp()
consumer["State"] = True consumer["State"] = True
for rem in removelist: for rem in removelist:
self.red.consumerlist.remove(rem) consumer_list.remove(rem)
self.red.consumerlist = consumer_list
def updateconsumer(self): def updateconsumer(self):
"""If the current consumer is not available, checks all the consumers in the list and updates the current one. """If the current consumer is not available, checks all the consumers in the list and updates the current one.
@ -100,6 +111,7 @@ class ConsumerLocator:
self.red.currentconsumer = newcurrentconsumer self.red.currentconsumer = newcurrentconsumer
if self.red.currentconsumer is not None: if self.red.currentconsumer is not None:
LOGGER.warning(f"Falling back to consumer at {newcurrentconsumer['Host']}")
self.learnconsumerlist() self.learnconsumerlist()
if self.red.currentconsumer is not None: if self.red.currentconsumer is not None:

87
integtest.py Normal file
View File

@ -0,0 +1,87 @@
#!/usr/bin/env python
"""
Integration test for the producer module.
"""
import re
from consumerinformation import ConsumerInformation
from communicator import Communicator
from consumerlocator import ConsumerLocator
from messagesender import MessageSender
from redisconnector import RedisConnector
from pytest_redis import factories
__author__ = "@dscharnitzky"
__copyright__ = "Copyright 2020, GoldenPogácsa Team"
__module_name__ = "integtest"
__version__text__ = "1"
generateduuid = '2fbff1f2-27e7-44c8-88d9-7348cee8c1c3'
redis_proc = factories.redis_proc(host='cache', port=6379)
redis_db = factories.redisdb('redis_nooproc')
answer = ""
def answermethod(Request):
global answer
answer = Request
return ""
def test_integration(httpserver):
"""
Tests the whole system.
:param httpserver: simple HTTP server
"""
#Inint
httpserver.expect_request(
uri="/consumers",
method='GET',
data="").respond_with_json(
["localhost", "localhost", "1.2.3.4"])
httpserver.expect_request(
uri="/log",
method='POST').respond_with_handler(answermethod)
url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnector = RedisConnector()
consumerinfomation = ConsumerInformation(redisconnector=redisconnector)
comm = Communicator(
currentconsumer=f"127.0.0.1:{port}",
uuid=generateduuid, consumerinformation=consumerinfomation)
messagesender = MessageSender(communicator=comm, uuid=generateduuid)
consumerlocator = ConsumerLocator(communicator=comm,
redisconnector=redisconnector)
#First test method
consumerlocator.learnconsumerlist()
conslist = consumerlocator.red.consumerlist
assert len(conslist) == 3
#Mock in port numbers
consrepllist = []
for consumer in conslist:
newconsumer = {"Host": consumer["Host"]+":"+port, "State": consumer["State"], "LastOk": consumer["LastOk"]}
consrepllist.append(newconsumer)
consumerlocator.red.consumerlist = consrepllist
conslist = consumerlocator.red.consumerlist
#Second test method call
consumerlocator.updateconsumerlist()
conslist = consumerlocator.red.consumerlist
error = False
for consumer in conslist:
if consumer["Host"] == "localhost:"+port and consumer["State"] is True:
pass
elif consumer["Host"] == "1.2.3.4:"+port and consumer["State"] is False:
pass
elif consumer["Host"] == "10.69.42.1:"+port and consumer["State"] is False:
pass
else:
error = True
assert error is False
#Third test method call
consumerlocator.updateconsumer()
assert consumerlocator.red.currentconsumer["Host"] == "localhost:"+port
messagesender.sendmessage()
assert answer != ""

View File

@ -48,7 +48,7 @@ class RedisConnector:
Gets currently active consumer. Gets currently active consumer.
:return: :return:
""" """
return self.redisconnection.get('currentConsumer') return json.loads(self.redisconnection.get('currentConsumer'))
def set_currentconsumer(self, currentconsumer): def set_currentconsumer(self, currentconsumer):
""" """
@ -56,7 +56,8 @@ class RedisConnector:
:param currentconsumer: :param currentconsumer:
:return: :return:
""" """
self.redisconnection.set('currentConsumer', currentconsumer) json_dict = json.dumps(currentconsumer)
self.redisconnection.set('currentConsumer', json_dict)
consumerlist = property(get_consumerlist, set_consumerlist) consumerlist = property(get_consumerlist, set_consumerlist)
currentconsumer = property(get_currentconsumer, set_currentconsumer) currentconsumer = property(get_currentconsumer, set_currentconsumer)

View File

@ -3,4 +3,5 @@ requests
pytest pytest
pytest-mock pytest-mock
pytest-httpserver pytest-httpserver
pytest-redis
redis redis

95
test.py
View File

@ -9,7 +9,8 @@ import consumerlocator
import communicator import communicator
import messagesender import messagesender
import redisconnector import redisconnector
import consumerinformation
from pytest_redis import factories
__author__ = "@tormakris" __author__ = "@tormakris"
__copyright__ = "Copyright 2020, GoldenPogácsa Team" __copyright__ = "Copyright 2020, GoldenPogácsa Team"
@ -17,6 +18,8 @@ __module_name__ = "test"
__version__text__ = "1" __version__text__ = "1"
generateduuid = 'c959ad81-58f9-4445-aab4-8f3d68aee1ad' generateduuid = 'c959ad81-58f9-4445-aab4-8f3d68aee1ad'
redis_proc = factories.redis_proc(host='cache', port=6379)
redis_db = factories.redisdb('redis_nooproc')
def test_generate_string(mocker): def test_generate_string(mocker):
@ -26,10 +29,12 @@ def test_generate_string(mocker):
:param mocker: patches the :class:`communicator.Communicator`. :param mocker: patches the :class:`communicator.Communicator`.
""" """
mocker.patch('communicator.Communicator') mocker.patch('communicator.Communicator')
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
comm = communicator.Communicator( comm = communicator.Communicator(
currentconsumer="localhost", currentconsumer="localhost",
uuid=generateduuid) uuid=generateduuid, consumerinformation=consumerinfo)
mess = messagesender.MessageSender(communicator=comm) mess = messagesender.MessageSender(communicator=comm, uuid=generateduuid)
msg = mess.randomstring(stringlength=32) msg = mess.randomstring(stringlength=32)
assert isinstance(msg, str) assert isinstance(msg, str)
assert len(msg) == 32 assert len(msg) == 32
@ -50,9 +55,13 @@ def test_sendmessage(httpserver):
"test": "ok"}) "test": "ok"})
url = httpserver.url_for("/") url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
comm = communicator.Communicator( comm = communicator.Communicator(
currentconsumer=f"127.0.0.1:{port}", currentconsumer=f"127.0.0.1:{port}",
uuid=generateduuid) uuid=generateduuid, consumerinformation=consumerinfo)
mess = "SENDING" mess = "SENDING"
ret = comm.sendmessage(message=mess) ret = comm.sendmessage(message=mess)
assert ret is None assert ret is None
@ -66,10 +75,14 @@ def test_send_message(mocker):
:return: None :return: None
""" """
mocker.patch('communicator.Communicator') mocker.patch('communicator.Communicator')
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
redisconnect.currentconsumer = {"Host": f"127.0.0.1", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1", "State": True, "LastOk": "1589479202"}]
comm = communicator.Communicator( comm = communicator.Communicator(
currentconsumer="127.0.0.1", currentconsumer="127.0.0.1",
uuid=generateduuid) uuid=generateduuid, consumerinformation=consumerinfo)
mess = messagesender.MessageSender(communicator=comm) mess = messagesender.MessageSender(communicator=comm, uuid=generateduuid)
messa = "SENDING" messa = "SENDING"
msg = mess.sendmessage(message=messa) msg = mess.sendmessage(message=messa)
assert msg is None assert msg is None
@ -89,9 +102,13 @@ def test_discoveravailableconsumers(httpserver):
["10.69.42.1", "10.10.10.10", "10.20.30.40"]) ["10.69.42.1", "10.10.10.10", "10.20.30.40"])
url = httpserver.url_for("/") url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
comm = communicator.Communicator( comm = communicator.Communicator(
currentconsumer=f"127.0.0.1:{port}", currentconsumer=f"127.0.0.1:{port}",
uuid=generateduuid) uuid=generateduuid, consumerinformation=consumerinfo)
ret = comm.discoveravailableconsumers() ret = comm.discoveravailableconsumers()
assert isinstance(ret, list) assert isinstance(ret, list)
assert ret == ["10.69.42.1", "10.10.10.10", "10.20.30.40"] assert ret == ["10.69.42.1", "10.10.10.10", "10.20.30.40"]
@ -111,9 +128,13 @@ def test_isconsumeravailable(httpserver):
["10.69.42.1", "10.10.10.10", "10.20.30.40"]) ["10.69.42.1", "10.10.10.10", "10.20.30.40"])
url = httpserver.url_for("/") url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
comm = communicator.Communicator( comm = communicator.Communicator(
currentconsumer=f"127.0.0.1:{port}", currentconsumer=f"127.0.0.1:{port}",
uuid=generateduuid) uuid=generateduuid, consumerinformation=consumerinfo)
ret = comm.isconsumeravailable() ret = comm.isconsumeravailable()
assert isinstance(ret, bool) assert isinstance(ret, bool)
assert ret assert ret
@ -124,7 +145,7 @@ def test_isconsumeravailable(httpserver):
comm2 = communicator.Communicator( comm2 = communicator.Communicator(
currentconsumer="127.0.0.1:69", currentconsumer="127.0.0.1:69",
uuid=generateduuid) uuid=generateduuid, consumerinformation=consumerinfo)
ret3 = comm2.isconsumeravailable() ret3 = comm2.isconsumeravailable()
assert isinstance(ret3, bool) assert isinstance(ret3, bool)
@ -145,9 +166,13 @@ def test_checkconsumer(httpserver):
["10.69.42.1", "10.10.10.10", "10.20.30.40"]) ["10.69.42.1", "10.10.10.10", "10.20.30.40"])
url = httpserver.url_for("/") url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
comm = communicator.Communicator( comm = communicator.Communicator(
currentconsumer="127.0.0.1", currentconsumer="127.0.0.1",
uuid=generateduuid) uuid=generateduuid, consumerinformation=consumerinfo)
ret = comm.checkconsumer(f"127.0.0.1:{port}") ret = comm.checkconsumer(f"127.0.0.1:{port}")
assert isinstance(ret, bool) assert isinstance(ret, bool)
assert ret assert ret
@ -158,8 +183,7 @@ def test_checkconsumer(httpserver):
comm2 = communicator.Communicator( comm2 = communicator.Communicator(
currentconsumer="127.0.0.1", currentconsumer="127.0.0.1",
uuid=generateduuid) uuid=generateduuid, consumerinformation=consumerinfo)
ret3 = comm2.checkconsumer(f"127.0.0.1:{port}") ret3 = comm2.checkconsumer(f"127.0.0.1:{port}")
assert isinstance(ret3, bool) assert isinstance(ret3, bool)
assert ret3 == False assert ret3 == False
@ -171,9 +195,11 @@ def test_setcurrentconsumer():
:return: None :return: None
""" """
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
comm = communicator.Communicator( comm = communicator.Communicator(
currentconsumer="127.0.0.1", currentconsumer="127.0.0.1",
uuid=generateduuid) uuid=generateduuid, consumerinformation=consumerinfo)
comm.set_currentconsumer("10.69.42.1") comm.set_currentconsumer("10.69.42.1")
assert comm.currenctconsumer == "10.69.42.1" assert comm.currenctconsumer == "10.69.42.1"
@ -192,12 +218,16 @@ def test_learnconsumerlist(httpserver):
["10.69.42.1", "10.10.10.10", "10.20.30.40"]) ["10.69.42.1", "10.10.10.10", "10.20.30.40"])
url = httpserver.url_for("/") url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
comm = communicator.Communicator( comm = communicator.Communicator(
currentconsumer=f"127.0.0.1:{port}", currentconsumer=f"127.0.0.1:{port}",
uuid=generateduuid) uuid=generateduuid, consumerinformation=consumerinfo)
consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}" consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}"
locator = consumerlocator.ConsumerLocator( locator = consumerlocator.ConsumerLocator(
uuid=generateduuid, communicator=comm, communicator=comm,
redisconnector=redisconnector.RedisConnector()) redisconnector=redisconnector.RedisConnector())
ret = locator.learnconsumerlist() ret = locator.learnconsumerlist()
assert ret is None assert ret is None
@ -211,11 +241,13 @@ def test_getcurrentconsumer(mocker):
:return: None :return: None
""" """
mocker.patch('communicator.Communicator') mocker.patch('communicator.Communicator')
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
comm = communicator.Communicator( comm = communicator.Communicator(
currentconsumer="127.0.0.1", currentconsumer="127.0.0.1",
uuid=generateduuid) uuid=generateduuid, consumerinformation=consumerinfo)
locator = consumerlocator.ConsumerLocator( locator = consumerlocator.ConsumerLocator(
uuid=generateduuid, communicator=comm, communicator=comm,
redisconnector=redisconnector.RedisConnector()) redisconnector=redisconnector.RedisConnector())
assert locator.getcurrentconsumer() == consumerlocator.KNOWNCONSUMER assert locator.getcurrentconsumer() == consumerlocator.KNOWNCONSUMER
@ -234,13 +266,17 @@ def test_checkcurrentconsumer(httpserver):
["10.69.42.1", "10.10.10.10", "10.20.30.40"]) ["10.69.42.1", "10.10.10.10", "10.20.30.40"])
url = httpserver.url_for("/") url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
comm = communicator.Communicator( comm = communicator.Communicator(
currentconsumer=f"127.0.0.1:{port}", currentconsumer=f"127.0.0.1:{port}",
uuid=generateduuid) uuid=generateduuid, consumerinformation=consumerinfo)
consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}" consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}"
locator = consumerlocator.ConsumerLocator( locator = consumerlocator.ConsumerLocator(
uuid=generateduuid, communicator=comm, communicator=comm,
redisconnector=redisconnector.RedisConnector()) redisconnector=redisconnector.RedisConnector())
redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
ret = locator.checkcurrentconsumer() ret = locator.checkcurrentconsumer()
assert ret == True assert ret == True
@ -259,14 +295,19 @@ def test_updateconsumer(httpserver):
["10.69.42.1", "10.10.10.10", "10.20.30.40"]) ["10.69.42.1", "10.10.10.10", "10.20.30.40"])
url = httpserver.url_for("/") url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
comm = communicator.Communicator( comm = communicator.Communicator(
currentconsumer=f"127.0.0.1:{port}", currentconsumer=f"127.0.0.1:{port}",
uuid=generateduuid) uuid=generateduuid, consumerinformation=consumerinfo)
consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}" consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}"
redisconn = redisconnector.RedisConnector()
locator = consumerlocator.ConsumerLocator( locator = consumerlocator.ConsumerLocator(
uuid=generateduuid, communicator=comm, communicator=comm,
redisconnector=redisconnector.RedisConnector()) redisconnector=redisconn)
assert locator.currentconsumer is not None redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
assert redisconn.currentconsumer is not None
ret = locator.updateconsumer() ret = locator.updateconsumer()
assert ret == f"127.0.0.1:{port}" assert ret == f"127.0.0.1:{port}"
@ -285,12 +326,16 @@ def test_updateconsumerlist(httpserver):
["10.69.42.1", "10.10.10.10", "10.20.30.40"]) ["10.69.42.1", "10.10.10.10", "10.20.30.40"])
url = httpserver.url_for("/") url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
comm = communicator.Communicator( comm = communicator.Communicator(
currentconsumer=f"127.0.0.1:{port}", currentconsumer=f"127.0.0.1:{port}",
uuid=generateduuid) uuid=generateduuid, consumerinformation=consumerinfo)
consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}" consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}"
locator = consumerlocator.ConsumerLocator( locator = consumerlocator.ConsumerLocator(
uuid=generateduuid, communicator=comm, communicator=comm,
redisconnector=redisconnector.RedisConnector()) redisconnector=redisconnector.RedisConnector())
redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
ret = locator.updateconsumerlist() ret = locator.updateconsumerlist()
assert ret is None assert ret is None