This commit is contained in:
commit
c5a3fb2d04
38
.drone.yml
38
.drone.yml
@ -15,26 +15,24 @@ steps:
|
||||
|
||||
- name: unit_test
|
||||
image: python:3.8
|
||||
environment:
|
||||
PRODUCER_REDIS: cache
|
||||
commands:
|
||||
- pip3 install -r requirements.txt
|
||||
- pytest test.py
|
||||
|
||||
- name: build
|
||||
image: docker:stable-dind
|
||||
volumes:
|
||||
- name: dockersock
|
||||
path: /var/run
|
||||
environment:
|
||||
DOCKER_USERNAME:
|
||||
- name: build-app
|
||||
image: banzaicloud/drone-kaniko
|
||||
settings:
|
||||
registry: registry.kmlabz.com
|
||||
repo: goldenpogacsa/${DRONE_REPO_NAME}
|
||||
username:
|
||||
from_secret: DOCKER_USERNAME
|
||||
DOCKER_PASSWORD:
|
||||
password:
|
||||
from_secret: DOCKER_PASSWORD
|
||||
commands:
|
||||
- echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin
|
||||
- docker build -t="$DOCKER_USERNAME/producer" .
|
||||
- docker build -t="$DOCKER_USERNAME/producer:$DRONE_BUILD_NUMBER" .
|
||||
- docker push "$DOCKER_USERNAME/producer"
|
||||
- docker push "$DOCKER_USERNAME/producer:$DRONE_BUILD_NUMBER"
|
||||
tags:
|
||||
- latest
|
||||
- ${DRONE_BUILD_NUMBER}
|
||||
|
||||
- name: make_docs
|
||||
image: python:3.8
|
||||
@ -56,11 +54,11 @@ steps:
|
||||
from_secret: DOCKER_PASSWORD
|
||||
commands:
|
||||
- cd docs
|
||||
- echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin
|
||||
- docker build -t="$DOCKER_USERNAME/producer-docs" .
|
||||
- docker build -t="$DOCKER_USERNAME/producer-docs:$DRONE_BUILD_NUMBER" .
|
||||
- docker push "$DOCKER_USERNAME/producer-docs"
|
||||
- docker push "$DOCKER_USERNAME/producer-docs:$DRONE_BUILD_NUMBER"
|
||||
- echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin registry.kmlabz.com
|
||||
- docker build -t="registry.kmlabz.com/goldenpogacsa/producer-docs" .
|
||||
- docker build -t="registry.kmlabz.com/goldenpogacsa/producer-docs:$DRONE_BUILD_NUMBER" .
|
||||
- docker push "registry.kmlabz.com/goldenpogacsa/producer-docs"
|
||||
- docker push "registry.kmlabz.com/goldenpogacsa/producer-docs:$DRONE_BUILD_NUMBER"
|
||||
|
||||
- name: slack
|
||||
image: plugins/slack
|
||||
@ -79,6 +77,8 @@ services:
|
||||
volumes:
|
||||
- name: dockersock
|
||||
path: /var/run
|
||||
- name: cache
|
||||
image: redis
|
||||
|
||||
volumes:
|
||||
- name: dockersock
|
||||
|
@ -2,6 +2,13 @@
|
||||
P2P Producer
|
||||
============
|
||||
|
||||
This repository contains the Producer part of the project. The module manages the list of consumers and
|
||||
sends data to the currently active one. If that becomes unavailable, it chooses an other one to the data to.
|
||||
|
||||
Implementation is done in python, the code is put into Docker images (as the consumers). To run the full project
|
||||
clone the main repository and run
|
||||
*docker-compose up.*
|
||||
|
||||
Produced by GoldenPogácsa Inc.
|
||||
|
||||
TODO
|
||||
|
2
app.py
2
app.py
@ -43,9 +43,9 @@ if __name__ == "__main__":
|
||||
messagesender = MessageSender(communicator=communicator, uuid=generateduuid)
|
||||
consumerlocator = ConsumerLocator(uuid=generateduuid, communicator=communicator,
|
||||
redisconnector=RedisConnector())
|
||||
consumerlocator.learnconsumerlist()
|
||||
|
||||
while True:
|
||||
consumerlocator.learnconsumerlist()
|
||||
LOGGER.info(f"Updating consumer list of {generateduuid}")
|
||||
consumerlocator.updateconsumer()
|
||||
LOGGER.info("Sending message to consumer")
|
||||
|
@ -6,6 +6,7 @@ Communicator module
|
||||
|
||||
import logging
|
||||
import requests
|
||||
import requests.exceptions
|
||||
|
||||
__author__ = "@tormakris"
|
||||
__copyright__ = "Copyright 2020, GoldenPogácsa Team"
|
||||
@ -15,6 +16,7 @@ __version__text__ = "1"
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Communicator:
|
||||
"""
|
||||
Class handling low level communication with consumers.
|
||||
@ -38,8 +40,11 @@ class Communicator:
|
||||
"""
|
||||
currentconsumer=self.currenctconsumer
|
||||
LOGGER.info(f"Sending message to {currentconsumer}")
|
||||
postresponse=requests.post(f'http://{currentconsumer}/log', json={'uuid': self.uuid, 'message': message})
|
||||
try:
|
||||
postresponse=requests.post(f'http://{currentconsumer}/log', json={'uuid': self.uuid, 'message': message}, timeout=5)
|
||||
LOGGER.debug(f"Message status code is:{postresponse.status_code}")
|
||||
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
|
||||
LOGGER.exception(e) # Fun fact: ez azt jelenti, hogy elveszett az üzenet... ide valami retry kellene inkább más consumerek felé...
|
||||
|
||||
def discoveravailableconsumers(self) -> list:
|
||||
"""Get the list of available consumer from the current primary consumer. Logs the received list.
|
||||
@ -48,12 +53,13 @@ class Communicator:
|
||||
"""
|
||||
try:
|
||||
currentconsumer = self.currenctconsumer
|
||||
response = requests.get(f'http://{currentconsumer}/consumers')
|
||||
response = requests.get(f'http://{currentconsumer}/consumers', timeout=5)
|
||||
json = response.json()
|
||||
LOGGER.info(f"List of currently available consumers: {json}")
|
||||
return json
|
||||
except Exception as e:
|
||||
LOGGER.exception(e)
|
||||
LOGGER.error("Could not query available consumer list!")
|
||||
#LOGGER.exception(e)
|
||||
return []
|
||||
|
||||
def isconsumeravailable(self) -> bool:
|
||||
@ -63,10 +69,10 @@ class Communicator:
|
||||
"""
|
||||
currentconsumer = self.currenctconsumer
|
||||
try:
|
||||
response = requests.get(f'http://{currentconsumer}/consumers')
|
||||
response = requests.get(f'http://{currentconsumer}/consumers', timeout=5)
|
||||
isavailable = response.status_code == 200
|
||||
except Exception as e:
|
||||
LOGGER.exception(e)
|
||||
#LOGGER.exception(e)
|
||||
isavailable = False
|
||||
LOGGER.info(f"Current consumer availability: {isavailable}")
|
||||
return isavailable
|
||||
@ -78,10 +84,10 @@ class Communicator:
|
||||
:return: True if available, False otherwise
|
||||
"""
|
||||
try:
|
||||
response = requests.get(f'http://{consumer}/consumers')
|
||||
response = requests.get(f'http://{consumer}/consumers', timeout=5)
|
||||
isavailable = response.status_code == 200
|
||||
except Exception as e:
|
||||
LOGGER.exception(e)
|
||||
#LOGGER.exception(e)
|
||||
isavailable = False
|
||||
LOGGER.info(f"Consumer {consumer} availability: {isavailable}")
|
||||
return isavailable
|
||||
|
@ -8,6 +8,7 @@ import datetime
|
||||
from communicator import Communicator
|
||||
import os
|
||||
from redisconnector import RedisConnector
|
||||
import logging
|
||||
|
||||
__author__ = "@dscharnitzky"
|
||||
__copyright__ = "Copyright 2020, GoldenPogácsa Team"
|
||||
@ -15,6 +16,7 @@ __module_name__ = "consumerlocator"
|
||||
__version__text__ = "1"
|
||||
|
||||
KNOWNCONSUMER = os.getenv("PRODUCER_KNOWNCONSUMER", '10.69.42.1')
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConsumerLocator:
|
||||
@ -33,7 +35,7 @@ class ConsumerLocator:
|
||||
:param communicator: the :class:'communicator.Communicator' instance that will be used for the low level communication.
|
||||
"""
|
||||
self.red = redisconnector
|
||||
self.red.consumerlist = [{"Host": KNOWNCONSUMER, "State": True, "LastOk": datetime.datetime.now()}]
|
||||
self.red.consumerlist = [{"Host": KNOWNCONSUMER, "State": True, "LastOk": datetime.datetime.now().timestamp()}]
|
||||
self.red.currentconsumer = self.red.consumerlist[0]
|
||||
self.communicator = communicator
|
||||
|
||||
@ -46,8 +48,12 @@ class ConsumerLocator:
|
||||
:returns: None
|
||||
"""
|
||||
recievedconsumerlist = self.communicator.discoveravailableconsumers()
|
||||
if recievedconsumerlist is None:
|
||||
if not recievedconsumerlist:
|
||||
return
|
||||
|
||||
|
||||
consumer_list = self.red.consumerlist
|
||||
|
||||
for recconsumer in recievedconsumerlist:
|
||||
contains = False
|
||||
for consumer in self.red.consumerlist:
|
||||
@ -55,8 +61,10 @@ class ConsumerLocator:
|
||||
contains = True
|
||||
|
||||
if not contains:
|
||||
self.red.consumerlist.append({"Host": recconsumer, "State": True, "LastOk": datetime.datetime.now()})
|
||||
LOGGER.info(f"Learned about new consumer at {recconsumer}")
|
||||
consumer_list.append({"Host": recconsumer, "State": True, "LastOk": datetime.datetime.now().timestamp()})
|
||||
|
||||
self.red.consumerlist = consumer_list
|
||||
self.updateconsumerlist()
|
||||
|
||||
def updateconsumerlist(self) -> None:
|
||||
@ -68,16 +76,20 @@ class ConsumerLocator:
|
||||
:return: None
|
||||
"""
|
||||
removelist = []
|
||||
for consumer in self.red.consumerlist:
|
||||
consumer_list = self.red.consumerlist
|
||||
|
||||
for consumer in consumer_list:
|
||||
if not self.communicator.checkconsumer(consumer["Host"]):
|
||||
consumer["State"] = False
|
||||
if datetime.datetime.now() - consumer["LastOk"] > datetime.timedelta(hours=1):
|
||||
if datetime.datetime.now() - datetime.datetime.fromtimestamp(consumer["LastOk"]) > datetime.timedelta(hours=1):
|
||||
removelist.append(consumer)
|
||||
else:
|
||||
consumer["LastOk"] = datetime.datetime.now()
|
||||
consumer["LastOk"] = datetime.datetime.now().timestamp()
|
||||
consumer["State"] = True
|
||||
for rem in removelist:
|
||||
self.red.consumerlist.remove(rem)
|
||||
consumer_list.remove(rem)
|
||||
|
||||
self.red.consumerlist = consumer_list
|
||||
|
||||
def updateconsumer(self):
|
||||
"""If the current consumer is not available, checks all the consumers in the list and updates the current one.
|
||||
@ -100,6 +112,7 @@ class ConsumerLocator:
|
||||
|
||||
self.red.currentconsumer = newcurrentconsumer
|
||||
if self.red.currentconsumer is not None:
|
||||
LOGGER.warning(f"Falling back to consumer at {newcurrentconsumer['Host']}")
|
||||
self.learnconsumerlist()
|
||||
|
||||
if self.red.currentconsumer is not None:
|
||||
|
@ -48,7 +48,7 @@ class RedisConnector:
|
||||
Gets currently active consumer.
|
||||
:return:
|
||||
"""
|
||||
return self.redisconnection.get('currentConsumer')
|
||||
return json.loads(self.redisconnection.get('currentConsumer'))
|
||||
|
||||
def set_currentconsumer(self, currentconsumer):
|
||||
"""
|
||||
@ -56,7 +56,8 @@ class RedisConnector:
|
||||
:param currentconsumer:
|
||||
:return:
|
||||
"""
|
||||
self.redisconnection.set('currentConsumer', currentconsumer)
|
||||
json_dict = json.dumps(currentconsumer)
|
||||
self.redisconnection.set('currentConsumer', json_dict)
|
||||
|
||||
consumerlist = property(get_consumerlist, set_consumerlist)
|
||||
currentconsumer = property(get_currentconsumer, set_currentconsumer)
|
||||
|
@ -3,4 +3,5 @@ requests
|
||||
pytest
|
||||
pytest-mock
|
||||
pytest-httpserver
|
||||
pytest-redis
|
||||
redis
|
9
test.py
9
test.py
@ -9,7 +9,7 @@ import consumerlocator
|
||||
import communicator
|
||||
import messagesender
|
||||
import redisconnector
|
||||
|
||||
from pytest_redis import factories
|
||||
|
||||
__author__ = "@tormakris"
|
||||
__copyright__ = "Copyright 2020, GoldenPogácsa Team"
|
||||
@ -17,6 +17,8 @@ __module_name__ = "test"
|
||||
__version__text__ = "1"
|
||||
|
||||
generateduuid = 'c959ad81-58f9-4445-aab4-8f3d68aee1ad'
|
||||
redis_proc = factories.redis_proc(host='cache', port=6379)
|
||||
redis_db = factories.redisdb('redis_nooproc')
|
||||
|
||||
|
||||
def test_generate_string(mocker):
|
||||
@ -263,10 +265,11 @@ def test_updateconsumer(httpserver):
|
||||
currentconsumer=f"127.0.0.1:{port}",
|
||||
uuid=generateduuid)
|
||||
consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}"
|
||||
redisconn = redisconnector.RedisConnector()
|
||||
locator = consumerlocator.ConsumerLocator(
|
||||
uuid=generateduuid, communicator=comm,
|
||||
redisconnector=redisconnector.RedisConnector())
|
||||
assert locator.currentconsumer is not None
|
||||
redisconnector=redisconn)
|
||||
assert redisconn.currentconsumer is not None
|
||||
ret = locator.updateconsumer()
|
||||
assert ret == f"127.0.0.1:{port}"
|
||||
|
||||
|
Reference in New Issue
Block a user