big work #2
@@ -18,6 +18,7 @@ def redis_super_storage_instance(mocker):
 | 
			
		||||
    mocker.patch("redis.from_url")
 | 
			
		||||
    yield RedisSuperStorage(REDIS_URL, REDIS_TIMEOUT)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def ip_watchdog_instance(mocker, redis_super_storage_instance):
 | 
			
		||||
    mocker.patch("socket.gethostname", side_effect=lambda: CURRENT_HOSTNAME)
 | 
			
		||||
@@ -188,3 +189,43 @@ def test_rst_get_consumer_list_expire_while_get(redis_super_storage_instance):
 | 
			
		||||
    redis_super_storage_instance.r.get.assert_called_once_with(b"consumer_uuid1")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# ========================================
 | 
			
		||||
# IPWatchdog
 | 
			
		||||
# ========================================
 | 
			
		||||
 | 
			
		||||
# __init__
 | 
			
		||||
 | 
			
		||||
def test_ipw_instantiate(mocker, redis_super_storage_instance):
 | 
			
		||||
    mocker.patch("socket.gethostname", side_effect=lambda: "test")
 | 
			
		||||
    ipw = IPWatchdog(redis_super_storage_instance)
 | 
			
		||||
 | 
			
		||||
    assert ipw._host_name == "test"
 | 
			
		||||
    assert ipw._redis_store == redis_super_storage_instance
 | 
			
		||||
    socket.gethostname.assert_called_once()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_ipw_is_changed_false(mocker, ip_watchdog_instance):
 | 
			
		||||
    ip_watchdog_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8")
 | 
			
		||||
 | 
			
		||||
    changed, ip = ip_watchdog_instance.ip_changed()
 | 
			
		||||
 | 
			
		||||
    assert not changed
 | 
			
		||||
    assert ip == CURRENT_IPADDR
 | 
			
		||||
 | 
			
		||||
    ip_watchdog_instance._redis_store.r.get.assert_called_once_with("current_ip")
 | 
			
		||||
    assert not ip_watchdog_instance._redis_store.r.set.called
 | 
			
		||||
    socket.gethostbyname.assert_called_once_with(CURRENT_HOSTNAME)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_ipw_is_changed_true(mocker, ip_watchdog_instance):
 | 
			
		||||
    mocker.patch("socket.gethostbyname", side_effect=lambda a: "192.168.2.123")
 | 
			
		||||
    ip_watchdog_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8")
 | 
			
		||||
 | 
			
		||||
    changed, ip = ip_watchdog_instance.ip_changed()
 | 
			
		||||
 | 
			
		||||
    assert changed
 | 
			
		||||
    assert ip == "192.168.2.123"
 | 
			
		||||
 | 
			
		||||
    ip_watchdog_instance._redis_store.r.get.assert_called_once_with("current_ip")
 | 
			
		||||
    assert ip_watchdog_instance._redis_store.r.set.called_once_with("current_ip", b"192.168.2.123")
 | 
			
		||||
    socket.gethostbyname.assert_called_once_with(CURRENT_HOSTNAME)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user