introduce type hinting to consumerlocator
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Torma Kristóf 2020-03-30 17:10:48 +02:00
parent 62f059b461
commit 94edf03cf8
Signed by: tormakris
GPG Key ID: DC83C4F2C41B1047
2 changed files with 9 additions and 15 deletions

3
app.py
View File

@ -25,11 +25,12 @@ if __name__ == "__main__":
LOGGER.info("Producer started")
generateduuid = str(uuid)
LOGGER.debug(f"My uuid is {generateduuid}")
consumerlocator = ConsumerLocator()
consumerlocator = ConsumerLocator(communicator=Communicator())
communicator = Communicator(consumerlocator=consumerlocator,uuid=uuid)
messagesender = MessageSender(communicator=communicator)
while True:
LOGGER.info("Sending message to consumer")
messagesender.sendmessage()
time.sleep(random.random())

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python
import datetime
import communicator
from communicator import Communicator
import os
"""
@ -20,22 +20,15 @@ class ConsumerLocator:
Manages the list of consumers.
"""
def __init__(self):
def __init__(self, communicator: Communicator):
"""
Initialize class.
"""
os.environ["KnownConsumer"] = "10.69.42.2" # TODO remove
self.consumerlist = [{"Host": os.environ["KnownConsumer"], "State": True, "LastOk": datetime.datetime.now()}]
self.currentconsumer = self.consumerlist[0]
self.communicator = communicator
def initcommunicator(self, comm: communicator.Communicator):
"""
Initialize the reference to the communicator
:param comm: is the communicator
"""
self.communicator = comm
def learnconsumerlist(self):
def learnconsumerlist(self) -> None:
""""
Learns the list of consumers.
"""
@ -53,7 +46,7 @@ class ConsumerLocator:
self.updateconsumerlist()
def updateconsumerlist(self):
def updateconsumerlist(self) -> None:
"""
Updates the consumer list based on their availability.
"""
@ -93,14 +86,14 @@ class ConsumerLocator:
else:
return None
def getcurrentconsumer(self):
def getcurrentconsumer(self) -> str:
"""
Returns the currently selected consumer.
:return: the current consumer
"""
return self.currentconsumer["Host"]
def checkcurrentconsumer(self):
def checkcurrentconsumer(self) -> bool:
"""
Check the consumers health.
:return: True if OK, False if fail