#!/usr/bin/env python """ Unit tests for the producer module. """ import re import consumerlocator import communicator import messagesender import redisconnector from pytest_redis import factories __author__ = "@tormakris" __copyright__ = "Copyright 2020, GoldenPogácsa Team" __module_name__ = "test" __version__text__ = "1" generateduuid = 'c959ad81-58f9-4445-aab4-8f3d68aee1ad' redis_db = factories.redisdb(port=6379) def test_generate_string(mocker): """ Tests :func:`messagesender.MessageSender.randomstring`. :param mocker: patches the :class:`communicator.Communicator`. """ mocker.patch('communicator.Communicator') comm = communicator.Communicator( currentconsumer="localhost", uuid=generateduuid) mess = messagesender.MessageSender(communicator=comm) msg = mess.randomstring(stringlength=32) assert isinstance(msg, str) assert len(msg) == 32 def test_sendmessage(httpserver): """ Tests :func:`communicator.Communicator.sendmessage`. :param httpserver: simple HTTP server :return: None """ httpserver.expect_oneshot_request( uri="/log", method='POST', data="{\"uuid\": \"c959ad81-58f9-4445-aab4-8f3d68aee1ad\", \"message\": \"SENDING\"}").respond_with_json( { "test": "ok"}) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) comm = communicator.Communicator( currentconsumer=f"127.0.0.1:{port}", uuid=generateduuid) mess = "SENDING" ret = comm.sendmessage(message=mess) assert ret is None def test_send_message(mocker): """ Tests :func:`messagesender.MessageSender.sendmessage`. :param mocker: patches the :class:`communicator.Communicator`. :return: None """ mocker.patch('communicator.Communicator') comm = communicator.Communicator( currentconsumer="127.0.0.1", uuid=generateduuid) mess = messagesender.MessageSender(communicator=comm) messa = "SENDING" msg = mess.sendmessage(message=messa) assert msg is None def test_discoveravailableconsumers(httpserver): """ Tests :func:`communicator.Communicator.discoveravailableconsumers` :param httpserver: simple HTTP server :return: None """ httpserver.expect_oneshot_request( uri="/consumers", method='GET', data="").respond_with_json( ["10.69.42.1", "10.10.10.10", "10.20.30.40"]) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) comm = communicator.Communicator( currentconsumer=f"127.0.0.1:{port}", uuid=generateduuid) ret = comm.discoveravailableconsumers() assert isinstance(ret, list) assert ret == ["10.69.42.1", "10.10.10.10", "10.20.30.40"] def test_isconsumeravailable(httpserver): """ Tests :func:`communicator.Communicator.isconsumeravailable`. :param httpserver: simple HTTP server :return: None """ httpserver.expect_oneshot_request( uri="/consumers", method='GET', data="").respond_with_json( ["10.69.42.1", "10.10.10.10", "10.20.30.40"]) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) comm = communicator.Communicator( currentconsumer=f"127.0.0.1:{port}", uuid=generateduuid) ret = comm.isconsumeravailable() assert isinstance(ret, bool) assert ret ret2 = comm.isconsumeravailable() assert isinstance(ret2, bool) assert ret2 == False comm2 = communicator.Communicator( currentconsumer="127.0.0.1:69", uuid=generateduuid) ret3 = comm2.isconsumeravailable() assert isinstance(ret3, bool) assert ret3 == False def test_checkconsumer(httpserver): """ Tests :func:`communicator.Communicator.checkconsumer`. :param httpserver: simple HTTP server :return: None """ httpserver.expect_oneshot_request( uri="/consumers", method='GET', data="").respond_with_json( ["10.69.42.1", "10.10.10.10", "10.20.30.40"]) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) comm = communicator.Communicator( currentconsumer="127.0.0.1", uuid=generateduuid) ret = comm.checkconsumer(f"127.0.0.1:{port}") assert isinstance(ret, bool) assert ret ret2 = comm.checkconsumer(f"127.0.0.1:{port}") assert isinstance(ret2, bool) assert ret2 == False comm2 = communicator.Communicator( currentconsumer="127.0.0.1", uuid=generateduuid) ret3 = comm2.checkconsumer(f"127.0.0.1:{port}") assert isinstance(ret3, bool) assert ret3 == False def test_setcurrentconsumer(): """ Tests :func:`communicator.Communicator.set_currentconsumer` :return: None """ comm = communicator.Communicator( currentconsumer="127.0.0.1", uuid=generateduuid) comm.set_currentconsumer("10.69.42.1") assert comm.currenctconsumer == "10.69.42.1" def test_learnconsumerlist(httpserver, redis_db): """ Tests :func:`consumerlocator.ConsumerLocator.learnconsumerlist` :param httpserver: simple HTTP server :param redis_db: mock redis :return: None """ httpserver.expect_request( uri="/consumers", method='GET', data="").respond_with_json( ["10.69.42.1", "10.10.10.10", "10.20.30.40"]) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) comm = communicator.Communicator( currentconsumer=f"127.0.0.1:{port}", uuid=generateduuid) consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}" locator = consumerlocator.ConsumerLocator( uuid=generateduuid, communicator=comm, redisconnector=redisconnector.RedisConnector()) ret = locator.learnconsumerlist() assert ret is None def test_getcurrentconsumer(mocker, redis_db): """ Tests :func:`consumerlocator.ConsumerLocator.getcurrentconsumer` :param mocker: patches the :class:`communicator.Communicator`. :param redis_db: mock redis :return: None """ mocker.patch('communicator.Communicator') comm = communicator.Communicator( currentconsumer="127.0.0.1", uuid=generateduuid) locator = consumerlocator.ConsumerLocator( uuid=generateduuid, communicator=comm, redisconnector=redisconnector.RedisConnector()) assert locator.getcurrentconsumer() == consumerlocator.KNOWNCONSUMER def test_checkcurrentconsumer(httpserver, redis_db): """ Tests :func:`consumerlocator.ConsumerLocator.checkcurrentconsumer` :param httpserver: simple HTTP server :param redis_db: mock redis :return: None """ httpserver.expect_oneshot_request( uri="/consumers", method='GET', data="").respond_with_json( ["10.69.42.1", "10.10.10.10", "10.20.30.40"]) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) comm = communicator.Communicator( currentconsumer=f"127.0.0.1:{port}", uuid=generateduuid) consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}" locator = consumerlocator.ConsumerLocator( uuid=generateduuid, communicator=comm, redisconnector=redisconnector.RedisConnector()) ret = locator.checkcurrentconsumer() assert ret == True def test_updateconsumer(httpserver, redis_db): """ Tests :func:`consumerlocator.ConsumerLocator.updateconsumer` :param httpserver: simple HTTP server :param redis_db: mock redis :return: None """ httpserver.expect_oneshot_request( uri="/consumers", method='GET', data="").respond_with_json( ["10.69.42.1", "10.10.10.10", "10.20.30.40"]) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) comm = communicator.Communicator( currentconsumer=f"127.0.0.1:{port}", uuid=generateduuid) consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}" locator = consumerlocator.ConsumerLocator( uuid=generateduuid, communicator=comm, redisconnector=redisconnector.RedisConnector()) assert locator.currentconsumer is not None ret = locator.updateconsumer() assert ret == f"127.0.0.1:{port}" def test_updateconsumerlist(httpserver, redis_db): """ Tests :func:`consumerlocator.ConsumerLocator.updateconsumerlist` :param httpserver: simple HTTP server :param redis_db: mock redis :return: None """ httpserver.expect_oneshot_request( uri="/consumers", method='GET', data="").respond_with_json( ["10.69.42.1", "10.10.10.10", "10.20.30.40"]) url = httpserver.url_for("/") port = re.match(r"\W*http[^:]*\D*(\d+)", url).group(1) comm = communicator.Communicator( currentconsumer=f"127.0.0.1:{port}", uuid=generateduuid) consumerlocator.KNOWNCONSUMER = f"127.0.0.1:{port}" locator = consumerlocator.ConsumerLocator( uuid=generateduuid, communicator=comm, redisconnector=redisconnector.RedisConnector()) ret = locator.updateconsumerlist() assert ret is None