from redis_super_storage import RedisSuperStorage from communicators import ConsumerCommunicator, ProducerCommunicator from ip_watchdog import IPWatchdog import pytest import redis import json import socket REDIS_URL = "redis://localhost/0" REDIS_TIMEOUT = 2 CURRENT_HOSTNAME = "testenv.local" CURRENT_IPADDR = "192.168.1.50" @pytest.fixture def redis_super_storage_instance(mocker): mocker.patch("redis.from_url") yield RedisSuperStorage(REDIS_URL, REDIS_TIMEOUT) @pytest.fixture def ip_watchdog_instance(mocker, redis_super_storage_instance): mocker.patch("socket.gethostname", side_effect=lambda: CURRENT_HOSTNAME) mocker.patch("socket.gethostbyname", side_effect=lambda a: CURRENT_IPADDR) yield IPWatchdog(redis_super_storage_instance) # ======================================== # RedisSuperStorage # ======================================== # __init__ def test_rst_instance_creation(mocker): mocker.patch("redis.from_url") rst = RedisSuperStorage("test", 2) redis.from_url.assert_called_once_with("test") assert rst._timeout == 2 # ip get/set def test_rst_ip_getter_get_none(redis_super_storage_instance): redis_super_storage_instance.r.get.side_effect = lambda a: None ip = redis_super_storage_instance.current_ip assert ip is None redis_super_storage_instance.r.get.assert_called_once_with('current_ip') def test_rst_ip_getter_get_ip(redis_super_storage_instance): redis_super_storage_instance.r.get.side_effect = lambda a: b"127.0.0.1" ip = redis_super_storage_instance.current_ip assert ip == "127.0.0.1" redis_super_storage_instance.r.get.assert_called_once_with('current_ip') def test_rst_ip_getter_set_ip(redis_super_storage_instance): redis_super_storage_instance.current_ip = "127.0.0.1" redis_super_storage_instance.r.set.assert_called_once_with('current_ip', b"127.0.0.1") # update consumer def test_rst_update_consumer(mocker, redis_super_storage_instance): mocker.patch("time.time", side_effect=lambda: 123) redis_super_storage_instance.update_consumer("testuuid", "127.0.0.1") cust_key = "consumer_testuuid" info = { "uuid": "testuuid", "ip": "127.0.0.1", "last_seen": 123 } redis_super_storage_instance.r.set.assert_called_once_with(cust_key, json.dumps(info).encode('utf-8')) redis_super_storage_instance.r.expire.assert_called_once_with(cust_key, REDIS_TIMEOUT) # producer list def test_rst_get_producer_list(redis_super_storage_instance): redis_super_storage_instance.r.keys.side_effect = lambda a: [ b"producer_uuid1", b"producer_uuid2", b"producer_uuid3" ] data = { b"producer_uuid1": b"127.0.0.1", b"producer_uuid2": b"127.0.0.2", b"producer_uuid3": b"127.0.0.3", } redis_super_storage_instance.r.get.side_effect = lambda a: data[a] lst = redis_super_storage_instance.get_producer_list() assert lst == { "producer_uuid1": "127.0.0.1", "producer_uuid2": "127.0.0.2", "producer_uuid3": "127.0.0.3", } redis_super_storage_instance.r.keys.assert_called_once_with("producer_*") def test_rst_get_producer_expire_while_get(redis_super_storage_instance): redis_super_storage_instance.r.keys.side_effect = lambda a: [b"producer_uuid1"] redis_super_storage_instance.r.get.side_effect = lambda a: None lst = redis_super_storage_instance.get_producer_list() assert isinstance(lst, dict) assert lst == {} redis_super_storage_instance.r.keys.assert_called_once_with("producer_*") # get_consumer_list def test_rst_get_consumer_list(redis_super_storage_instance): redis_super_storage_instance.r.keys.side_effect = lambda a: [ b"consumer_uuid1", b"consumer_uuid2", b"consumer_uuid3" ] data = { b"consumer_uuid1": json.dumps({ "uuid": "consumer_uuid1", "ip": "127.0.0.1", "last_seen": 123 }).encode("utf-8"), b"consumer_uuid2": json.dumps({ "uuid": "consumer_uuid2", "ip": "127.0.0.2", "last_seen": 1234 }).encode("utf-8"), b"consumer_uuid3": json.dumps({ "uuid": "consumer_uuid3", "ip": "127.0.0.3", "last_seen": 1235 }).encode("utf-8") } redis_super_storage_instance.r.get.side_effect = lambda a: data[a] lst = redis_super_storage_instance.get_consumer_list() assert lst == { "consumer_uuid1": { "uuid": "consumer_uuid1", "ip": "127.0.0.1", "last_seen": 123 }, "consumer_uuid2": { "uuid": "consumer_uuid2", "ip": "127.0.0.2", "last_seen": 1234 }, "consumer_uuid3": { "uuid": "consumer_uuid3", "ip": "127.0.0.3", "last_seen": 1235 } } redis_super_storage_instance.r.keys.assert_called_once_with("consumer_*") assert redis_super_storage_instance.r.get.call_count == 3 def test_rst_get_consumer_list_expire_while_get(redis_super_storage_instance): redis_super_storage_instance.r.keys.side_effect = lambda a: [b"consumer_uuid1"] redis_super_storage_instance.r.get.side_effect = lambda a: None lst = redis_super_storage_instance.get_consumer_list() assert isinstance(lst, dict) assert lst == {} redis_super_storage_instance.r.keys.assert_called_once_with("consumer_*") redis_super_storage_instance.r.get.assert_called_once_with(b"consumer_uuid1")