232 lines
7.1 KiB
Python
232 lines
7.1 KiB
Python
#!/usr/bin/env python
|
|
|
|
import os
|
|
import re
|
|
import consumerlocator
|
|
import communicator
|
|
import messagesender
|
|
|
|
"""
|
|
Unit tests for producer module.
|
|
"""
|
|
|
|
__author__ = "@tormakris"
|
|
__copyright__ = "Copyright 2020, GoldenPogácsa Team"
|
|
__module_name__ = "test"
|
|
__version__text__ = "1"
|
|
|
|
generateduuid = 'c959ad81-58f9-4445-aab4-8f3d68aee1ad'
|
|
os.environ["PRODUCER_KNOWNCONSUMER"] = "10.1.2.3"
|
|
|
|
|
|
def test_generate_string(mocker):
|
|
mocker.patch('communicator.Communicator')
|
|
comm = communicator.Communicator(
|
|
currentconsumer="localhost",
|
|
uuid=generateduuid)
|
|
mess = messagesender.MessageSender(communicator=comm)
|
|
msg = mess.randomstring(stringlength=32)
|
|
assert isinstance(msg, str)
|
|
assert len(msg) == 32
|
|
|
|
|
|
def test_sendmessage(httpserver):
|
|
httpserver.expect_oneshot_request(
|
|
uri="/log",
|
|
method='POST',
|
|
data="{\"uuid\": \"c959ad81-58f9-4445-aab4-8f3d68aee1ad\", \"message\": \"SENDING\"}").respond_with_json(
|
|
{
|
|
"test": "ok"})
|
|
url = httpserver.url_for("/")
|
|
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
|
|
comm = communicator.Communicator(
|
|
currentconsumer=f"127.0.0.1:{port}",
|
|
uuid=generateduuid)
|
|
mess = "SENDING"
|
|
ret = comm.sendmessage(message=mess)
|
|
assert ret is None
|
|
|
|
|
|
def test_send_message(mocker):
|
|
mocker.patch('communicator.Communicator')
|
|
comm = communicator.Communicator(
|
|
currentconsumer="127.0.0.1",
|
|
uuid=generateduuid)
|
|
mess = messagesender.MessageSender(communicator=comm)
|
|
messa = "SENDING"
|
|
msg = mess.sendmessage(message=messa)
|
|
assert msg is None
|
|
|
|
|
|
def test_discoveravailableconsumers(httpserver):
|
|
httpserver.expect_oneshot_request(
|
|
uri="/consumer",
|
|
method='GET',
|
|
data="").respond_with_json(
|
|
["10.69.42.1", "10.10.10.10", "10.20.30.40"])
|
|
url = httpserver.url_for("/")
|
|
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
|
|
comm = communicator.Communicator(
|
|
currentconsumer=f"127.0.0.1:{port}",
|
|
uuid=generateduuid)
|
|
ret = comm.discoveravailableconsumers()
|
|
assert isinstance(ret, list)
|
|
assert ret == ["10.69.42.1", "10.10.10.10", "10.20.30.40"]
|
|
|
|
|
|
def test_isconsumeravailable(httpserver):
|
|
httpserver.expect_oneshot_request(
|
|
uri="/consumer",
|
|
method='GET',
|
|
data="").respond_with_json(
|
|
["10.69.42.1", "10.10.10.10", "10.20.30.40"])
|
|
url = httpserver.url_for("/")
|
|
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
|
|
comm = communicator.Communicator(
|
|
currentconsumer=f"127.0.0.1:{port}",
|
|
uuid=generateduuid)
|
|
ret = comm.isconsumeravailable()
|
|
assert isinstance(ret, bool)
|
|
assert ret
|
|
|
|
ret2 = comm.isconsumeravailable()
|
|
assert isinstance(ret2, bool)
|
|
assert ret2 == False
|
|
|
|
comm2 = communicator.Communicator(
|
|
currentconsumer="127.0.0.1:69",
|
|
uuid=generateduuid)
|
|
|
|
ret3 = comm2.isconsumeravailable()
|
|
assert isinstance(ret3, bool)
|
|
assert ret3 == False
|
|
|
|
|
|
def test_checkconsumer(httpserver):
|
|
httpserver.expect_oneshot_request(
|
|
uri="/consumer",
|
|
method='GET',
|
|
data="").respond_with_json(
|
|
["10.69.42.1", "10.10.10.10", "10.20.30.40"])
|
|
url = httpserver.url_for("/")
|
|
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
|
|
comm = communicator.Communicator(
|
|
currentconsumer="127.0.0.1",
|
|
uuid=generateduuid)
|
|
ret = comm.checkconsumer(f"127.0.0.1:{port}")
|
|
assert isinstance(ret, bool)
|
|
assert ret
|
|
|
|
ret2 = comm.checkconsumer(f"127.0.0.1:{port}")
|
|
assert isinstance(ret2, bool)
|
|
assert ret2 == False
|
|
|
|
comm2 = communicator.Communicator(
|
|
currentconsumer="127.0.0.1",
|
|
uuid=generateduuid)
|
|
|
|
ret3 = comm2.checkconsumer(f"127.0.0.1:{port}")
|
|
assert isinstance(ret3, bool)
|
|
assert ret3 == False
|
|
|
|
|
|
def test_setcurrentconsumer():
|
|
comm = communicator.Communicator(
|
|
currentconsumer="127.0.0.1",
|
|
uuid=generateduuid)
|
|
comm.set_currentconsumer("10.69.42.1")
|
|
assert comm.currenctconsumer == "10.69.42.1"
|
|
|
|
|
|
def test_learnconsumerlist(httpserver):
|
|
httpserver.expect_oneshot_request(
|
|
uri="/consumer",
|
|
method='GET',
|
|
data="").respond_with_json(
|
|
["10.69.42.1", "10.10.10.10", "10.20.30.40"])
|
|
url = httpserver.url_for("/")
|
|
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
|
|
comm = communicator.Communicator(
|
|
currentconsumer=f"127.0.0.1:{port}",
|
|
uuid=generateduuid)
|
|
locator = consumerlocator.ConsumerLocator(
|
|
uuid=generateduuid, communicator=comm)
|
|
ret = locator.learnconsumerlist()
|
|
assert ret is None
|
|
|
|
|
|
def test_getcurrentconsumer(mocker):
|
|
mocker.patch('communicator.Communicator')
|
|
comm = communicator.Communicator(
|
|
currentconsumer="127.0.0.1",
|
|
uuid=generateduuid)
|
|
locator = consumerlocator.ConsumerLocator(
|
|
uuid=generateduuid, communicator=comm)
|
|
assert locator.getcurrentconsumer() == "10.1.2.3"
|
|
|
|
|
|
def test_checkcurrentconsumer(httpserver):
|
|
httpserver.expect_oneshot_request(
|
|
uri="/consumer",
|
|
method='GET',
|
|
data="").respond_with_json(
|
|
["10.69.42.1", "10.10.10.10", "10.20.30.40"])
|
|
url = httpserver.url_for("/")
|
|
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
|
|
comm = communicator.Communicator(
|
|
currentconsumer=f"127.0.0.1:{port}",
|
|
uuid=generateduuid)
|
|
locator = consumerlocator.ConsumerLocator(
|
|
uuid=generateduuid, communicator=comm)
|
|
ret = locator.checkcurrentconsumer()
|
|
assert ret == False
|
|
|
|
|
|
def test_updateconsumer(httpserver):
|
|
httpserver.expect_oneshot_request(
|
|
uri="/consumer",
|
|
method='GET',
|
|
data="").respond_with_json(
|
|
["10.69.42.1", "10.10.10.10", "10.20.30.40"])
|
|
url = httpserver.url_for("/")
|
|
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
|
|
comm = communicator.Communicator(
|
|
currentconsumer=f"127.0.0.1:{port}",
|
|
uuid=generateduuid)
|
|
locator = consumerlocator.ConsumerLocator(
|
|
uuid=generateduuid, communicator=comm)
|
|
ret = locator.updateconsumer()
|
|
assert ret == "10.69.42.1"
|
|
|
|
httpserver.expect_oneshot_request(
|
|
uri="/consumer",
|
|
method='GET',
|
|
data="").respond_with_json(
|
|
[])
|
|
url2 = httpserver.url_for("/")
|
|
port2 = re.match(r"\W*http[^:]*\D*(\d+)", url2).group(1)
|
|
comm2 = communicator.Communicator(
|
|
currentconsumer=f"127.0.0.1:{port2}",
|
|
uuid=generateduuid)
|
|
locator2 = consumerlocator.ConsumerLocator(
|
|
uuid=generateduuid, communicator=comm2)
|
|
ret2 = locator2.updateconsumer()
|
|
assert ret2 is None
|
|
|
|
|
|
def test_updateconsumerlist(httpserver):
|
|
httpserver.expect_oneshot_request(
|
|
uri="/consumer",
|
|
method='GET',
|
|
data="").respond_with_json(
|
|
["10.69.42.1", "10.10.10.10", "10.20.30.40"])
|
|
url = httpserver.url_for("/")
|
|
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
|
|
comm = communicator.Communicator(
|
|
currentconsumer=f"127.0.0.1:{port}",
|
|
uuid=generateduuid)
|
|
locator = consumerlocator.ConsumerLocator(
|
|
uuid=generateduuid, communicator=comm)
|
|
ret = locator.updateconsumerlist()
|
|
assert ret is None
|