This repository has been archived on 2020-09-24. You can view files and clone it, but cannot push or open issues or pull requests.
producer/integtest.py

88 lines
2.8 KiB
Python
Raw Permalink Normal View History

2020-05-08 19:13:00 +02:00
#!/usr/bin/env python
"""
Integration test for the producer module.
"""
2020-05-08 20:57:23 +02:00
import re
2020-05-14 20:09:41 +02:00
from consumerinformation import ConsumerInformation
2020-05-08 19:21:38 +02:00
from communicator import Communicator
from consumerlocator import ConsumerLocator
from messagesender import MessageSender
from redisconnector import RedisConnector
2020-05-08 19:13:00 +02:00
from pytest_redis import factories
__author__ = "@dscharnitzky"
__copyright__ = "Copyright 2020, GoldenPogácsa Team"
__module_name__ = "integtest"
__version__text__ = "1"
generateduuid = '2fbff1f2-27e7-44c8-88d9-7348cee8c1c3'
redis_proc = factories.redis_proc(host='cache', port=6379)
redis_db = factories.redisdb('redis_nooproc')
2020-05-08 20:57:23 +02:00
answer = ""
2020-05-14 20:09:41 +02:00
2020-05-08 20:57:23 +02:00
def answermethod(Request):
global answer
answer = Request
return ""
2020-05-08 19:13:00 +02:00
2020-05-14 20:09:41 +02:00
2020-05-08 20:57:23 +02:00
def test_integration(httpserver):
2020-05-08 19:13:00 +02:00
"""
Tests the whole system.
2020-05-08 20:57:23 +02:00
:param httpserver: simple HTTP server
2020-05-08 19:13:00 +02:00
"""
2020-05-08 20:57:23 +02:00
#Inint
httpserver.expect_request(
uri="/consumers",
method='GET',
data="").respond_with_json(
["localhost", "localhost", "1.2.3.4"])
httpserver.expect_request(
uri="/log",
method='POST').respond_with_handler(answermethod)
url = httpserver.url_for("/")
port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1)
2020-05-14 20:09:41 +02:00
redisconnector = RedisConnector()
consumerinfomation = ConsumerInformation(redisconnector=redisconnector)
2020-05-08 20:57:23 +02:00
comm = Communicator(
currentconsumer=f"127.0.0.1:{port}",
2020-05-14 20:09:41 +02:00
uuid=generateduuid, consumerinformation=consumerinfomation)
2020-05-08 20:57:23 +02:00
messagesender = MessageSender(communicator=comm, uuid=generateduuid)
2020-05-14 20:09:41 +02:00
consumerlocator = ConsumerLocator(communicator=comm,
redisconnector=redisconnector)
2020-05-08 20:57:23 +02:00
#First test method
consumerlocator.learnconsumerlist()
conslist = consumerlocator.red.consumerlist
assert len(conslist) == 3
#Mock in port numbers
consrepllist = []
for consumer in conslist:
newconsumer = {"Host": consumer["Host"]+":"+port, "State": consumer["State"], "LastOk": consumer["LastOk"]}
consrepllist.append(newconsumer)
consumerlocator.red.consumerlist = consrepllist
conslist = consumerlocator.red.consumerlist
#Second test method call
consumerlocator.updateconsumerlist()
conslist = consumerlocator.red.consumerlist
error = False
for consumer in conslist:
if consumer["Host"] == "localhost:"+port and consumer["State"] is True:
pass
elif consumer["Host"] == "1.2.3.4:"+port and consumer["State"] is False:
pass
elif consumer["Host"] == "10.69.42.1:"+port and consumer["State"] is False:
pass
else:
error = True
assert error is False
#Third test method call
consumerlocator.updateconsumer()
assert consumerlocator.red.currentconsumer["Host"] == "localhost:"+port
messagesender.sendmessage()
assert answer != ""