This repository has been archived on 2020-09-24. You can view files and clone it, but cannot push or open issues or pull requests.
producer/test.py

342 lines
13 KiB
Python

#!/usr/bin/env python
"""
Unit tests for the producer module.
"""
import re
import consumerlocator
import communicator
import messagesender
import redisconnector
import consumerinformation
from pytest_redis import factories
__author__ = "@tormakris"
__copyright__ = "Copyright 2020, GoldenPogácsa Team"
__module_name__ = "test"
__version__text__ = "1"
generateduuid = 'c959ad81-58f9-4445-aab4-8f3d68aee1ad'
redis_proc = factories.redis_proc(host='cache', port=6379)
redis_db = factories.redisdb('redis_nooproc')
def test_generate_string(mocker):
"""
Tests :func:`messagesender.MessageSender.randomstring`.
:param mocker: patches the :class:`communicator.Communicator`.
"""
mocker.patch('communicator.Communicator')
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
comm = communicator.Communicator(
currentconsumer="localhost",
uuid=generateduuid, consumerinformation=consumerinfo)
mess = messagesender.MessageSender(communicator=comm, uuid=generateduuid)
msg = mess.randomstring(stringlength=32)
assert isinstance(msg, str)
assert len(msg) == 32
def test_sendmessage(httpserver):
"""
Tests :func:`communicator.Communicator.sendmessage`.
:param httpserver: simple HTTP server
:return: None
"""
httpserver.expect_oneshot_request(
uri="/log",
method='POST',
data="{\"uuid\": \"c959ad81-58f9-4445-aab4-8f3d68aee1ad\", \"message\": \"SENDING\"}").respond_with_json(
{
"test": "ok"})
url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
comm = communicator.Communicator(
currentconsumer=f"127.0.0.1:{port}",
uuid=generateduuid, consumerinformation=consumerinfo)
mess = "SENDING"
ret = comm.sendmessage(message=mess)
assert ret is None
def test_send_message(mocker):
"""
Tests :func:`messagesender.MessageSender.sendmessage`.
:param mocker: patches the :class:`communicator.Communicator`.
:return: None
"""
mocker.patch('communicator.Communicator')
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
redisconnect.currentconsumer = {"Host": f"127.0.0.1", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1", "State": True, "LastOk": "1589479202"}]
comm = communicator.Communicator(
currentconsumer="127.0.0.1",
uuid=generateduuid, consumerinformation=consumerinfo)
mess = messagesender.MessageSender(communicator=comm, uuid=generateduuid)
messa = "SENDING"
msg = mess.sendmessage(message=messa)
assert msg is None
def test_discoveravailableconsumers(httpserver):
"""
Tests :func:`communicator.Communicator.discoveravailableconsumers`
:param httpserver: simple HTTP server
:return: None
"""
httpserver.expect_oneshot_request(
uri="/consumers",
method='GET',
data="").respond_with_json(
["10.69.42.1", "10.10.10.10", "10.20.30.40"])
url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
comm = communicator.Communicator(
currentconsumer=f"127.0.0.1:{port}",
uuid=generateduuid, consumerinformation=consumerinfo)
ret = comm.discoveravailableconsumers()
assert isinstance(ret, list)
assert ret == ["10.69.42.1", "10.10.10.10", "10.20.30.40"]
def test_isconsumeravailable(httpserver):
"""
Tests :func:`communicator.Communicator.isconsumeravailable`.
:param httpserver: simple HTTP server
:return: None
"""
httpserver.expect_oneshot_request(
uri="/consumers",
method='GET',
data="").respond_with_json(
["10.69.42.1", "10.10.10.10", "10.20.30.40"])
url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
comm = communicator.Communicator(
currentconsumer=f"127.0.0.1:{port}",
uuid=generateduuid, consumerinformation=consumerinfo)
ret = comm.isconsumeravailable()
assert isinstance(ret, bool)
assert ret
ret2 = comm.isconsumeravailable()
assert isinstance(ret2, bool)
assert ret2 == False
comm2 = communicator.Communicator(
currentconsumer="127.0.0.1:69",
uuid=generateduuid, consumerinformation=consumerinfo)
ret3 = comm2.isconsumeravailable()
assert isinstance(ret3, bool)
assert ret3 == False
def test_checkconsumer(httpserver):
"""
Tests :func:`communicator.Communicator.checkconsumer`.
:param httpserver: simple HTTP server
:return: None
"""
httpserver.expect_oneshot_request(
uri="/consumers",
method='GET',
data="").respond_with_json(
["10.69.42.1", "10.10.10.10", "10.20.30.40"])
url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
comm = communicator.Communicator(
currentconsumer="127.0.0.1",
uuid=generateduuid, consumerinformation=consumerinfo)
ret = comm.checkconsumer(f"127.0.0.1:{port}")
assert isinstance(ret, bool)
assert ret
ret2 = comm.checkconsumer(f"127.0.0.1:{port}")
assert isinstance(ret2, bool)
assert ret2 == False
comm2 = communicator.Communicator(
currentconsumer="127.0.0.1",
uuid=generateduuid, consumerinformation=consumerinfo)
ret3 = comm2.checkconsumer(f"127.0.0.1:{port}")
assert isinstance(ret3, bool)
assert ret3 == False
def test_setcurrentconsumer():
"""
Tests :func:`communicator.Communicator.set_currentconsumer`
:return: None
"""
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
comm = communicator.Communicator(
currentconsumer="127.0.0.1",
uuid=generateduuid, consumerinformation=consumerinfo)
comm.set_currentconsumer("10.69.42.1")
assert comm.currenctconsumer == "10.69.42.1"
def test_learnconsumerlist(httpserver):
"""
Tests :func:`consumerlocator.ConsumerLocator.learnconsumerlist`
:param httpserver: simple HTTP server
:return: None
"""
httpserver.expect_request(
uri="/consumers",
method='GET',
data="").respond_with_json(
["10.69.42.1", "10.10.10.10", "10.20.30.40"])
url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
comm = communicator.Communicator(
currentconsumer=f"127.0.0.1:{port}",
uuid=generateduuid, consumerinformation=consumerinfo)
consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}"
locator = consumerlocator.ConsumerLocator(
communicator=comm,
redisconnector=redisconnector.RedisConnector())
ret = locator.learnconsumerlist()
assert ret is None
def test_getcurrentconsumer(mocker):
"""
Tests :func:`consumerlocator.ConsumerLocator.getcurrentconsumer`
:param mocker: patches the :class:`communicator.Communicator`.
:return: None
"""
mocker.patch('communicator.Communicator')
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
comm = communicator.Communicator(
currentconsumer="127.0.0.1",
uuid=generateduuid, consumerinformation=consumerinfo)
locator = consumerlocator.ConsumerLocator(
communicator=comm,
redisconnector=redisconnector.RedisConnector())
assert locator.getcurrentconsumer() == consumerlocator.KNOWNCONSUMER
def test_checkcurrentconsumer(httpserver):
"""
Tests :func:`consumerlocator.ConsumerLocator.checkcurrentconsumer`
:param httpserver: simple HTTP server
:return: None
"""
httpserver.expect_oneshot_request(
uri="/consumers",
method='GET',
data="").respond_with_json(
["10.69.42.1", "10.10.10.10", "10.20.30.40"])
url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
comm = communicator.Communicator(
currentconsumer=f"127.0.0.1:{port}",
uuid=generateduuid, consumerinformation=consumerinfo)
consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}"
locator = consumerlocator.ConsumerLocator(
communicator=comm,
redisconnector=redisconnector.RedisConnector())
redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
ret = locator.checkcurrentconsumer()
assert ret == True
def test_updateconsumer(httpserver):
"""
Tests :func:`consumerlocator.ConsumerLocator.updateconsumer`
:param httpserver: simple HTTP server
:return: None
"""
httpserver.expect_oneshot_request(
uri="/consumers",
method='GET',
data="").respond_with_json(
["10.69.42.1", "10.10.10.10", "10.20.30.40"])
url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
comm = communicator.Communicator(
currentconsumer=f"127.0.0.1:{port}",
uuid=generateduuid, consumerinformation=consumerinfo)
consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}"
redisconn = redisconnector.RedisConnector()
locator = consumerlocator.ConsumerLocator(
communicator=comm,
redisconnector=redisconn)
redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
assert redisconn.currentconsumer is not None
ret = locator.updateconsumer()
assert ret == f"127.0.0.1:{port}"
def test_updateconsumerlist(httpserver):
"""
Tests :func:`consumerlocator.ConsumerLocator.updateconsumerlist`
:param httpserver: simple HTTP server
:return: None
"""
httpserver.expect_oneshot_request(
uri="/consumers",
method='GET',
data="").respond_with_json(
["10.69.42.1", "10.10.10.10", "10.20.30.40"])
url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
redisconnect = redisconnector.RedisConnector()
consumerinfo = consumerinformation.ConsumerInformation(redisconnector=redisconnect)
comm = communicator.Communicator(
currentconsumer=f"127.0.0.1:{port}",
uuid=generateduuid, consumerinformation=consumerinfo)
consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}"
locator = consumerlocator.ConsumerLocator(
communicator=comm,
redisconnector=redisconnector.RedisConnector())
redisconnect.currentconsumer = {"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}
redisconnect.consumerlist = [{"Host": f"127.0.0.1:{port}", "State": True, "LastOk": "1589479202"}]
ret = locator.updateconsumerlist()
assert ret is None