This repository has been archived on 2020-09-24. You can view files and clone it, but cannot push or open issues or pull requests.
producer/consumerlocator.py

138 lines
5.1 KiB
Python

#!/usr/bin/env python
"""
Consumer locator module, that manages the list of consumers.
"""
import datetime
from communicator import Communicator
import os
from redisconnector import RedisConnector
import logging
__author__ = "@dscharnitzky"
__copyright__ = "Copyright 2020, GoldenPogácsa Team"
__module_name__ = "consumerlocator"
__version__text__ = "1"
KNOWNCONSUMER = os.getenv("PRODUCER_KNOWNCONSUMER", '10.69.42.1')
LOGGER = logging.getLogger(__name__)
class ConsumerLocator:
"""
Component responsible for managing the list of consumers. Requires an instance of :class:`communicator.Communicator`
"""
def __init__(self, communicator: Communicator, redisconnector: RedisConnector):
"""**Constructor:**
Initializes the object.
Gets the known consumer's IP address from the PRODUCER_KNOWNCONSUMER envar.
:param communicator: the :class:'communicator.Communicator' instance that will be used for the low level communication.
"""
self.red = redisconnector
self.red.consumerlist = [{"Host": KNOWNCONSUMER, "State": True, "LastOk": datetime.datetime.now().timestamp()}]
self.red.currentconsumer = self.red.consumerlist[0]
self.communicator = communicator
def learnconsumerlist(self) -> None:
""""Learns the list of consumers from the current consumer.
Calls :func:`communicator.Communicator.didiscoveravailableconsumers`, adds the learned consumers to the list
if are not present, and then calls :func:`consumerlocator.ConsumerLocator.updateconsumerlist`
:returns: None
"""
recievedconsumerlist = self.communicator.discoveravailableconsumers()
if not recievedconsumerlist:
return
consumer_list = self.red.consumerlist
for recconsumer in recievedconsumerlist:
contains = False
for consumer in consumer_list:
if consumer["Host"] == recconsumer:
contains = True
if not contains:
LOGGER.info(f"Learned about new consumer at {recconsumer}")
consumer_list.append(
{"Host": recconsumer, "State": True, "LastOk": datetime.datetime.now().timestamp()})
self.red.consumerlist = consumer_list
self.updateconsumerlist()
def updateconsumerlist(self) -> None:
""" Updates the consumer list based on their availability.
Marks for each consumer if they are available or not. If a consumer is not available for some time (1 hour),
the it will be deleted from the list.
:return: None
"""
removelist = []
consumer_list = self.red.consumerlist
for consumer in consumer_list:
if not self.communicator.checkconsumer(consumer["Host"]):
consumer["State"] = False
if datetime.datetime.now() - datetime.datetime.fromtimestamp(consumer["LastOk"]) > datetime.timedelta(
seconds=15):
removelist.append(consumer)
else:
consumer["LastOk"] = datetime.datetime.now().timestamp()
consumer["State"] = True
for rem in removelist:
consumer_list.remove(rem)
self.red.consumerlist = consumer_list
def updateconsumer(self):
"""If the current consumer is not available, checks all the consumers in the list and updates the current one.
Calls :func:`consumerlocator.ConsumerLocator.checkcurrentconsumer` and if needed
:func:`consumerlocator.ConsumerLocator.updateconsumerlist`. Sets the :class:`communicator.Communicator`
current instance with :func:`communicator.Communicator.set_currentconsumer`.
:return: the current consumer or None if there are no available customers at the moment.
"""
if not self.checkcurrentconsumer():
self.updateconsumerlist()
newcurrentconsumer = None
for consumer in self.red.consumerlist:
if consumer["State"]:
newcurrentconsumer = consumer
break
self.red.currentconsumer = newcurrentconsumer
if self.red.currentconsumer is not None:
LOGGER.warning(f"Falling back to consumer at {newcurrentconsumer['Host']}")
self.learnconsumerlist()
if self.red.currentconsumer is not None:
self.communicator.set_currentconsumer(self.red.currentconsumer["Host"])
return self.red.currentconsumer["Host"]
else:
return None
def getcurrentconsumer(self) -> str:
"""Returns the currently selected consumer's IP address.
:return: the current consumer
"""
return self.red.currentconsumer["Host"]
def checkcurrentconsumer(self) -> bool:
"""Check the current consumer's health.
:return: True if OK, False if fail
"""
if self.red.currentconsumer is None:
return False
return self.communicator.checkconsumer(self.red.currentconsumer["Host"])