from redis_super_storage import RedisSuperStorage from communicators import ConsumerCommunicator, ProducerCommunicator from ip_watchdog import IPWatchdog import pytest import redis import json import socket import requests import requests.exceptions import logging import os import app REDIS_URL = "redis://localhost/0" REDIS_TIMEOUT = 2 CURRENT_HOSTNAME = "testenv.local" CURRENT_IPADDR = "192.168.1.50" LOCAL_UUID = "testuuid1" os.environ["LOCAL_UUID"] = LOCAL_UUID os.environ["INITIAL_SERVERS"] = "127.0.0.1,192.168.0.1,172.20.0.2" @pytest.fixture def redis_super_storage_instance(mocker): mocker.patch("redis.from_url") yield RedisSuperStorage(REDIS_URL, REDIS_TIMEOUT) @pytest.fixture def ip_watchdog_instance(mocker, redis_super_storage_instance): mocker.patch("socket.gethostname", side_effect=lambda: CURRENT_HOSTNAME) mocker.patch("socket.gethostbyname", side_effect=lambda a: CURRENT_IPADDR) yield IPWatchdog(redis_super_storage_instance) @pytest.fixture def consumer_communicator_instance(redis_super_storage_instance): yield ConsumerCommunicator(redis_super_storage_instance) @pytest.fixture def producer_communicator_instance(redis_super_storage_instance): yield ProducerCommunicator(redis_super_storage_instance) # ======================================== # RedisSuperStorage # ======================================== # __init__ def test_rst_instance_creation(mocker): mocker.patch("redis.from_url") rst = RedisSuperStorage("test", 2) redis.from_url.assert_called_once_with("test") assert rst._timeout == 2 # ip get/set def test_rst_ip_getter_get_none(redis_super_storage_instance): redis_super_storage_instance.r.get.side_effect = lambda a: None ip = redis_super_storage_instance.current_ip assert ip is None redis_super_storage_instance.r.get.assert_called_once_with('current_ip') def test_rst_ip_getter_get_ip(redis_super_storage_instance): redis_super_storage_instance.r.get.side_effect = lambda a: b"127.0.0.1" ip = redis_super_storage_instance.current_ip assert ip == "127.0.0.1" redis_super_storage_instance.r.get.assert_called_once_with('current_ip') def test_rst_ip_getter_set_ip(redis_super_storage_instance): redis_super_storage_instance.current_ip = "127.0.0.1" redis_super_storage_instance.r.set.assert_called_once_with('current_ip', b"127.0.0.1") # update consumer def test_rst_update_consumer(mocker, redis_super_storage_instance): mocker.patch("time.time", side_effect=lambda: 123) redis_super_storage_instance.update_consumer("testuuid", "127.0.0.1") cust_key = "consumer_testuuid" info = { "uuid": "testuuid", "ip": "127.0.0.1", "last_seen": 123 } redis_super_storage_instance.r.set.assert_called_once_with(cust_key, json.dumps(info).encode('utf-8')) redis_super_storage_instance.r.expire.assert_called_once_with(cust_key, REDIS_TIMEOUT) # producer list def test_rst_get_producer_list(redis_super_storage_instance): redis_super_storage_instance.r.keys.side_effect = lambda a: [ b"producer_uuid1", b"producer_uuid2", b"producer_uuid3" ] data = { b"producer_uuid1": b"127.0.0.1", b"producer_uuid2": b"127.0.0.2", b"producer_uuid3": b"127.0.0.3", } redis_super_storage_instance.r.get.side_effect = lambda a: data[a] lst = redis_super_storage_instance.get_producer_list() assert lst == { "producer_uuid1": "127.0.0.1", "producer_uuid2": "127.0.0.2", "producer_uuid3": "127.0.0.3", } redis_super_storage_instance.r.keys.assert_called_once_with("producer_*") def test_rst_get_producer_expire_while_get(redis_super_storage_instance): redis_super_storage_instance.r.keys.side_effect = lambda a: [b"producer_uuid1"] redis_super_storage_instance.r.get.side_effect = lambda a: None lst = redis_super_storage_instance.get_producer_list() assert isinstance(lst, dict) assert lst == {} redis_super_storage_instance.r.keys.assert_called_once_with("producer_*") # get_consumer_list def test_rst_get_consumer_list(redis_super_storage_instance): redis_super_storage_instance.r.keys.side_effect = lambda a: [ b"consumer_uuid1", b"consumer_uuid2", b"consumer_uuid3" ] data = { b"consumer_uuid1": json.dumps({ "uuid": "consumer_uuid1", "ip": "127.0.0.1", "last_seen": 123 }).encode("utf-8"), b"consumer_uuid2": json.dumps({ "uuid": "consumer_uuid2", "ip": "127.0.0.2", "last_seen": 1234 }).encode("utf-8"), b"consumer_uuid3": json.dumps({ "uuid": "consumer_uuid3", "ip": "127.0.0.3", "last_seen": 1235 }).encode("utf-8") } redis_super_storage_instance.r.get.side_effect = lambda a: data[a] lst = redis_super_storage_instance.get_consumer_list() assert lst == { "consumer_uuid1": { "uuid": "consumer_uuid1", "ip": "127.0.0.1", "last_seen": 123 }, "consumer_uuid2": { "uuid": "consumer_uuid2", "ip": "127.0.0.2", "last_seen": 1234 }, "consumer_uuid3": { "uuid": "consumer_uuid3", "ip": "127.0.0.3", "last_seen": 1235 } } redis_super_storage_instance.r.keys.assert_called_once_with("consumer_*") assert redis_super_storage_instance.r.get.call_count == 3 def test_rst_get_consumer_list_expire_while_get(redis_super_storage_instance): redis_super_storage_instance.r.keys.side_effect = lambda a: [b"consumer_uuid1"] redis_super_storage_instance.r.get.side_effect = lambda a: None lst = redis_super_storage_instance.get_consumer_list() assert isinstance(lst, dict) assert lst == {} redis_super_storage_instance.r.keys.assert_called_once_with("consumer_*") redis_super_storage_instance.r.get.assert_called_once_with(b"consumer_uuid1") # ======================================== # IPWatchdog # ======================================== # __init__ def test_ipw_instantiate(mocker, redis_super_storage_instance): mocker.patch("socket.gethostname", side_effect=lambda: "test") ipw = IPWatchdog(redis_super_storage_instance) assert ipw._host_name == "test" assert ipw._redis_store == redis_super_storage_instance socket.gethostname.assert_called_once() def test_ipw_is_changed_false(mocker, ip_watchdog_instance): ip_watchdog_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8") changed, ip = ip_watchdog_instance.ip_changed() assert not changed assert ip == CURRENT_IPADDR ip_watchdog_instance._redis_store.r.get.assert_called_once_with("current_ip") assert not ip_watchdog_instance._redis_store.r.set.called socket.gethostbyname.assert_called_once_with(CURRENT_HOSTNAME) def test_ipw_is_changed_true(mocker, ip_watchdog_instance): mocker.patch("socket.gethostbyname", side_effect=lambda a: "192.168.2.123") ip_watchdog_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8") changed, ip = ip_watchdog_instance.ip_changed() assert changed assert ip == "192.168.2.123" ip_watchdog_instance._redis_store.r.get.assert_called_once_with("current_ip") assert ip_watchdog_instance._redis_store.r.set.called_once_with("current_ip", b"192.168.2.123") socket.gethostbyname.assert_called_once_with(CURRENT_HOSTNAME) # ======================================== # Communicators # ======================================== def test_cc_instantiate(redis_super_storage_instance): cc = ConsumerCommunicator(redis_super_storage_instance) assert cc._redis_store == redis_super_storage_instance assert isinstance(cc._session, requests.Session) def test_pc_instantiate(redis_super_storage_instance): pc = ProducerCommunicator(redis_super_storage_instance) assert pc._redis_store == redis_super_storage_instance assert isinstance(pc._session, requests.Session) # producer communicator def test_pc_push_ip_update(requests_mock, producer_communicator_instance): producer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [ b"producer_uuid1", b"producer_uuid2", b"producer_uuid3" ] data = { b"producer_uuid1": b"127.0.0.1", b"producer_uuid2": b"127.0.0.2", b"producer_uuid3": b"127.0.0.3", } producer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a] first = requests_mock.post("http://127.0.0.1/ip") second = requests_mock.post("http://127.0.0.2/ip") third = requests_mock.post("http://127.0.0.3/ip") producer_communicator_instance.push_ip_update(CURRENT_IPADDR) assert first.call_count == 1 assert second.call_count == 1 assert third.call_count == 1 assert first.last_request.json() == second.last_request.json() == third.last_request.json() assert first.last_request.json()['ip'] == CURRENT_IPADDR assert first.last_request.json()['uuid'] == LOCAL_UUID def test_pc_push_ip_update_error_logged(mocker, requests_mock, producer_communicator_instance): mocker.patch("logging.warning") producer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [ b"producer_uuid1", b"producer_uuid2", b"producer_uuid3" ] data = { b"producer_uuid1": b"127.0.0.1", b"producer_uuid2": b"127.0.0.2", b"producer_uuid3": b"127.0.0.3", } producer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a] first = requests_mock.post("http://127.0.0.1/ip") second = requests_mock.post("http://127.0.0.2/ip", exc=requests.exceptions.ConnectTimeout) third = requests_mock.post("http://127.0.0.3/ip") producer_communicator_instance.push_ip_update(CURRENT_IPADDR) assert first.call_count == 1 assert second.call_count == 1 assert third.call_count == 1 assert first.last_request.json() == third.last_request.json() assert first.last_request.json()['ip'] == CURRENT_IPADDR assert first.last_request.json()['uuid'] == LOCAL_UUID logging.warning.assert_called_once() # customer communicator def test_cc_targeted_sync(requests_mock, consumer_communicator_instance): a = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "testasdasdasd"}) consumer_communicator_instance.targeted_snyc("127.0.0.2") assert a.called assert a.last_request.json() == {'uuid': LOCAL_UUID} def test_cc_targeted_sync_ip_override(requests_mock, consumer_communicator_instance): consumer_communicator_instance._force_ip_override = True consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8") a = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "testasdasdasd"}) consumer_communicator_instance.targeted_snyc("127.0.0.2") assert a.called assert a.last_request.json() == {'uuid': LOCAL_UUID, "ip": CURRENT_IPADDR} def test_cc_targeted_sync_error_logged(mocker, requests_mock, consumer_communicator_instance): mocker.patch("logging.warning") requests_mock.post("http://127.0.0.2/sync", exc=requests.exceptions.ConnectTimeout) consumer_communicator_instance.targeted_snyc("127.0.0.2") logging.warning.assert_called_once() def test_cc_sync_all(requests_mock, consumer_communicator_instance): consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [ b"consumer_uuid1", b"consumer_uuid2", b"consumer_uuid3" ] data = { b"consumer_uuid1": json.dumps({ "uuid": "consumer_uuid1", "ip": "127.0.0.1", "last_seen": 123 }).encode("utf-8"), b"consumer_uuid2": json.dumps({ "uuid": "consumer_uuid2", "ip": "127.0.0.2", "last_seen": 1234 }).encode("utf-8"), b"consumer_uuid3": json.dumps({ "uuid": "consumer_uuid3", "ip": "127.0.0.3", "last_seen": 1235 }).encode("utf-8") } first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"}) second = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "consumer_uuid2"}) third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"}) consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a] consumer_communicator_instance.sync_all() assert first.called assert second.called assert third.called assert first.last_request.json() == second.last_request.json() == third.last_request.json() assert first.last_request.json()['uuid'] == LOCAL_UUID def test_cc_sync_all_ip_override(requests_mock, consumer_communicator_instance): consumer_communicator_instance._force_ip_override = True consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [ b"consumer_uuid1", b"consumer_uuid2", b"consumer_uuid3" ] data = { b"consumer_uuid1": json.dumps({ "uuid": "consumer_uuid1", "ip": "127.0.0.1", "last_seen": 123 }).encode("utf-8"), b"consumer_uuid2": json.dumps({ "uuid": "consumer_uuid2", "ip": "127.0.0.2", "last_seen": 1234 }).encode("utf-8"), b"consumer_uuid3": json.dumps({ "uuid": "consumer_uuid3", "ip": "127.0.0.3", "last_seen": 1235 }).encode("utf-8"), "current_ip": CURRENT_IPADDR.encode("utf-8") } first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"}) second = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "consumer_uuid2"}) third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"}) consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a] consumer_communicator_instance.sync_all() assert first.called assert second.called assert third.called assert first.last_request.json() == second.last_request.json() == third.last_request.json() assert first.last_request.json()['uuid'] == LOCAL_UUID assert first.last_request.json()['ip'] == CURRENT_IPADDR def test_cc_sync_all_error_logged(mocker, requests_mock, consumer_communicator_instance): mocker.patch("logging.warning") consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [ b"consumer_uuid1", b"consumer_uuid2", b"consumer_uuid3" ] data = { b"consumer_uuid1": json.dumps({ "uuid": "consumer_uuid1", "ip": "127.0.0.1", "last_seen": 123 }).encode("utf-8"), b"consumer_uuid2": json.dumps({ "uuid": "consumer_uuid2", "ip": "127.0.0.2", "last_seen": 1234 }).encode("utf-8"), b"consumer_uuid3": json.dumps({ "uuid": "consumer_uuid3", "ip": "127.0.0.3", "last_seen": 1235 }).encode("utf-8") } first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"}) second = requests_mock.post("http://127.0.0.2/sync", exc=requests.exceptions.ConnectTimeout) third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"}) consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a] consumer_communicator_instance.sync_all() assert first.called assert second.called assert third.called assert first.last_request.json() == third.last_request.json() assert first.last_request.json()['uuid'] == LOCAL_UUID logging.warning.assert_called_once() # ======================================== # App # ======================================== def test_app_get_initial_ip_list(): lst = app.get_initial_ip_list() assert lst == ["127.0.0.1", "192.168.0.1", "172.20.0.2"]