This repository has been archived on 2020-09-24. You can view files and clone it, but cannot push or open issues or pull requests.
consumer-scheduler/test_everything.py

515 lines
16 KiB
Python

from redis_super_storage import RedisSuperStorage
from communicators import ConsumerCommunicator, ProducerCommunicator
from ip_watchdog import IPWatchdog
import pytest
import redis
import json
import socket
import requests
import requests.exceptions
import logging
import os
import app
REDIS_URL = "redis://localhost/0"
REDIS_TIMEOUT = 2
CURRENT_HOSTNAME = "testenv.local"
CURRENT_IPADDR = "192.168.1.50"
LOCAL_UUID = "testuuid1"
os.environ["LOCAL_UUID"] = LOCAL_UUID
os.environ["INITIAL_SERVERS"] = "127.0.0.1,192.168.0.1,172.20.0.2"
@pytest.fixture
def redis_super_storage_instance(mocker):
mocker.patch("redis.from_url")
yield RedisSuperStorage(REDIS_URL, REDIS_TIMEOUT)
@pytest.fixture
def ip_watchdog_instance(mocker, redis_super_storage_instance):
mocker.patch("socket.gethostname", side_effect=lambda: CURRENT_HOSTNAME)
mocker.patch("socket.gethostbyname", side_effect=lambda a: CURRENT_IPADDR)
yield IPWatchdog(redis_super_storage_instance)
@pytest.fixture
def consumer_communicator_instance(redis_super_storage_instance):
yield ConsumerCommunicator(redis_super_storage_instance)
@pytest.fixture
def producer_communicator_instance(redis_super_storage_instance):
yield ProducerCommunicator(redis_super_storage_instance)
# ========================================
# RedisSuperStorage
# ========================================
# __init__
def test_rst_instance_creation(mocker):
mocker.patch("redis.from_url")
rst = RedisSuperStorage("test", 2)
redis.from_url.assert_called_once_with("test")
assert rst._timeout == 2
# ip get/set
def test_rst_ip_getter_get_none(redis_super_storage_instance):
redis_super_storage_instance.r.get.side_effect = lambda a: None
ip = redis_super_storage_instance.current_ip
assert ip is None
redis_super_storage_instance.r.get.assert_called_once_with('current_ip')
def test_rst_ip_getter_get_ip(redis_super_storage_instance):
redis_super_storage_instance.r.get.side_effect = lambda a: b"127.0.0.1"
ip = redis_super_storage_instance.current_ip
assert ip == "127.0.0.1"
redis_super_storage_instance.r.get.assert_called_once_with('current_ip')
def test_rst_ip_getter_set_ip(redis_super_storage_instance):
redis_super_storage_instance.current_ip = "127.0.0.1"
redis_super_storage_instance.r.set.assert_called_once_with('current_ip', b"127.0.0.1")
# update consumer
def test_rst_update_consumer(mocker, redis_super_storage_instance):
mocker.patch("time.time", side_effect=lambda: 123)
redis_super_storage_instance.update_consumer("testuuid", "127.0.0.1")
cust_key = "consumer_testuuid"
info = {
"uuid": "testuuid",
"ip": "127.0.0.1",
"last_seen": 123
}
redis_super_storage_instance.r.set.assert_called_once_with(cust_key, json.dumps(info).encode('utf-8'))
redis_super_storage_instance.r.expire.assert_called_once_with(cust_key, REDIS_TIMEOUT)
# producer list
def test_rst_get_producer_list(redis_super_storage_instance):
redis_super_storage_instance.r.keys.side_effect = lambda a: [
b"producer_uuid1",
b"producer_uuid2",
b"producer_uuid3"
]
data = {
b"producer_uuid1": b"127.0.0.1",
b"producer_uuid2": b"127.0.0.2",
b"producer_uuid3": b"127.0.0.3",
}
redis_super_storage_instance.r.get.side_effect = lambda a: data[a]
lst = redis_super_storage_instance.get_producer_list()
assert lst == {
"producer_uuid1": "127.0.0.1",
"producer_uuid2": "127.0.0.2",
"producer_uuid3": "127.0.0.3",
}
redis_super_storage_instance.r.keys.assert_called_once_with("producer_*")
def test_rst_get_producer_expire_while_get(redis_super_storage_instance):
redis_super_storage_instance.r.keys.side_effect = lambda a: [b"producer_uuid1"]
redis_super_storage_instance.r.get.side_effect = lambda a: None
lst = redis_super_storage_instance.get_producer_list()
assert isinstance(lst, dict)
assert lst == {}
redis_super_storage_instance.r.keys.assert_called_once_with("producer_*")
# get_consumer_list
def test_rst_get_consumer_list(redis_super_storage_instance):
redis_super_storage_instance.r.keys.side_effect = lambda a: [
b"consumer_uuid1",
b"consumer_uuid2",
b"consumer_uuid3"
]
data = {
b"consumer_uuid1": json.dumps({
"uuid": "consumer_uuid1",
"ip": "127.0.0.1",
"last_seen": 123
}).encode("utf-8"),
b"consumer_uuid2": json.dumps({
"uuid": "consumer_uuid2",
"ip": "127.0.0.2",
"last_seen": 1234
}).encode("utf-8"),
b"consumer_uuid3": json.dumps({
"uuid": "consumer_uuid3",
"ip": "127.0.0.3",
"last_seen": 1235
}).encode("utf-8")
}
redis_super_storage_instance.r.get.side_effect = lambda a: data[a]
lst = redis_super_storage_instance.get_consumer_list()
assert lst == {
"consumer_uuid1": {
"uuid": "consumer_uuid1",
"ip": "127.0.0.1",
"last_seen": 123
},
"consumer_uuid2": {
"uuid": "consumer_uuid2",
"ip": "127.0.0.2",
"last_seen": 1234
},
"consumer_uuid3": {
"uuid": "consumer_uuid3",
"ip": "127.0.0.3",
"last_seen": 1235
}
}
redis_super_storage_instance.r.keys.assert_called_once_with("consumer_*")
assert redis_super_storage_instance.r.get.call_count == 3
def test_rst_get_consumer_list_expire_while_get(redis_super_storage_instance):
redis_super_storage_instance.r.keys.side_effect = lambda a: [b"consumer_uuid1"]
redis_super_storage_instance.r.get.side_effect = lambda a: None
lst = redis_super_storage_instance.get_consumer_list()
assert isinstance(lst, dict)
assert lst == {}
redis_super_storage_instance.r.keys.assert_called_once_with("consumer_*")
redis_super_storage_instance.r.get.assert_called_once_with(b"consumer_uuid1")
# ========================================
# IPWatchdog
# ========================================
# __init__
def test_ipw_instantiate(mocker, redis_super_storage_instance):
mocker.patch("socket.gethostname", side_effect=lambda: "test")
ipw = IPWatchdog(redis_super_storage_instance)
assert ipw._host_name == "test"
assert ipw._redis_store == redis_super_storage_instance
socket.gethostname.assert_called_once()
def test_ipw_is_changed_false(mocker, ip_watchdog_instance):
ip_watchdog_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8")
changed, ip = ip_watchdog_instance.ip_changed()
assert not changed
assert ip == CURRENT_IPADDR
ip_watchdog_instance._redis_store.r.get.assert_called_once_with("current_ip")
assert not ip_watchdog_instance._redis_store.r.set.called
socket.gethostbyname.assert_called_once_with(CURRENT_HOSTNAME)
def test_ipw_is_changed_true(mocker, ip_watchdog_instance):
mocker.patch("socket.gethostbyname", side_effect=lambda a: "192.168.2.123")
ip_watchdog_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8")
changed, ip = ip_watchdog_instance.ip_changed()
assert changed
assert ip == "192.168.2.123"
ip_watchdog_instance._redis_store.r.get.assert_called_once_with("current_ip")
assert ip_watchdog_instance._redis_store.r.set.called_once_with("current_ip", b"192.168.2.123")
socket.gethostbyname.assert_called_once_with(CURRENT_HOSTNAME)
# ========================================
# Communicators
# ========================================
def test_cc_instantiate(redis_super_storage_instance):
cc = ConsumerCommunicator(redis_super_storage_instance)
assert cc._redis_store == redis_super_storage_instance
assert isinstance(cc._session, requests.Session)
def test_pc_instantiate(redis_super_storage_instance):
pc = ProducerCommunicator(redis_super_storage_instance)
assert pc._redis_store == redis_super_storage_instance
assert isinstance(pc._session, requests.Session)
# producer communicator
def test_pc_push_ip_update(requests_mock, producer_communicator_instance):
producer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
b"producer_uuid1",
b"producer_uuid2",
b"producer_uuid3"
]
data = {
b"producer_uuid1": b"127.0.0.1",
b"producer_uuid2": b"127.0.0.2",
b"producer_uuid3": b"127.0.0.3",
}
producer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
first = requests_mock.post("http://127.0.0.1/ip")
second = requests_mock.post("http://127.0.0.2/ip")
third = requests_mock.post("http://127.0.0.3/ip")
producer_communicator_instance.push_ip_update(CURRENT_IPADDR)
assert first.call_count == 1
assert second.call_count == 1
assert third.call_count == 1
assert first.last_request.json() == second.last_request.json() == third.last_request.json()
assert first.last_request.json()['ip'] == CURRENT_IPADDR
assert first.last_request.json()['uuid'] == LOCAL_UUID
def test_pc_push_ip_update_error_logged(mocker, requests_mock, producer_communicator_instance):
mocker.patch("logging.warning")
producer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
b"producer_uuid1",
b"producer_uuid2",
b"producer_uuid3"
]
data = {
b"producer_uuid1": b"127.0.0.1",
b"producer_uuid2": b"127.0.0.2",
b"producer_uuid3": b"127.0.0.3",
}
producer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
first = requests_mock.post("http://127.0.0.1/ip")
second = requests_mock.post("http://127.0.0.2/ip", exc=requests.exceptions.ConnectTimeout)
third = requests_mock.post("http://127.0.0.3/ip")
producer_communicator_instance.push_ip_update(CURRENT_IPADDR)
assert first.call_count == 1
assert second.call_count == 1
assert third.call_count == 1
assert first.last_request.json() == third.last_request.json()
assert first.last_request.json()['ip'] == CURRENT_IPADDR
assert first.last_request.json()['uuid'] == LOCAL_UUID
logging.warning.assert_called_once()
# customer communicator
def test_cc_targeted_sync(requests_mock, consumer_communicator_instance):
a = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "testasdasdasd"})
consumer_communicator_instance.targeted_snyc("127.0.0.2")
assert a.called
assert a.last_request.json() == {'uuid': LOCAL_UUID}
def test_cc_targeted_sync_ip_override(requests_mock, consumer_communicator_instance):
consumer_communicator_instance._force_ip_override = True
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8")
a = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "testasdasdasd"})
consumer_communicator_instance.targeted_snyc("127.0.0.2")
assert a.called
assert a.last_request.json() == {'uuid': LOCAL_UUID, "ip": CURRENT_IPADDR}
def test_cc_targeted_sync_error_logged(mocker, requests_mock, consumer_communicator_instance):
mocker.patch("logging.warning")
requests_mock.post("http://127.0.0.2/sync", exc=requests.exceptions.ConnectTimeout)
consumer_communicator_instance.targeted_snyc("127.0.0.2")
logging.warning.assert_called_once()
def test_cc_sync_all(requests_mock, consumer_communicator_instance):
consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
b"consumer_uuid1",
b"consumer_uuid2",
b"consumer_uuid3"
]
data = {
b"consumer_uuid1": json.dumps({
"uuid": "consumer_uuid1",
"ip": "127.0.0.1",
"last_seen": 123
}).encode("utf-8"),
b"consumer_uuid2": json.dumps({
"uuid": "consumer_uuid2",
"ip": "127.0.0.2",
"last_seen": 1234
}).encode("utf-8"),
b"consumer_uuid3": json.dumps({
"uuid": "consumer_uuid3",
"ip": "127.0.0.3",
"last_seen": 1235
}).encode("utf-8")
}
first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"})
second = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "consumer_uuid2"})
third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"})
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
consumer_communicator_instance.sync_all()
assert first.called
assert second.called
assert third.called
assert first.last_request.json() == second.last_request.json() == third.last_request.json()
assert first.last_request.json()['uuid'] == LOCAL_UUID
def test_cc_sync_all_ip_override(requests_mock, consumer_communicator_instance):
consumer_communicator_instance._force_ip_override = True
consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
b"consumer_uuid1",
b"consumer_uuid2",
b"consumer_uuid3"
]
data = {
b"consumer_uuid1": json.dumps({
"uuid": "consumer_uuid1",
"ip": "127.0.0.1",
"last_seen": 123
}).encode("utf-8"),
b"consumer_uuid2": json.dumps({
"uuid": "consumer_uuid2",
"ip": "127.0.0.2",
"last_seen": 1234
}).encode("utf-8"),
b"consumer_uuid3": json.dumps({
"uuid": "consumer_uuid3",
"ip": "127.0.0.3",
"last_seen": 1235
}).encode("utf-8"),
"current_ip": CURRENT_IPADDR.encode("utf-8")
}
first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"})
second = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "consumer_uuid2"})
third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"})
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
consumer_communicator_instance.sync_all()
assert first.called
assert second.called
assert third.called
assert first.last_request.json() == second.last_request.json() == third.last_request.json()
assert first.last_request.json()['uuid'] == LOCAL_UUID
assert first.last_request.json()['ip'] == CURRENT_IPADDR
def test_cc_sync_all_error_logged(mocker, requests_mock, consumer_communicator_instance):
mocker.patch("logging.warning")
consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
b"consumer_uuid1",
b"consumer_uuid2",
b"consumer_uuid3"
]
data = {
b"consumer_uuid1": json.dumps({
"uuid": "consumer_uuid1",
"ip": "127.0.0.1",
"last_seen": 123
}).encode("utf-8"),
b"consumer_uuid2": json.dumps({
"uuid": "consumer_uuid2",
"ip": "127.0.0.2",
"last_seen": 1234
}).encode("utf-8"),
b"consumer_uuid3": json.dumps({
"uuid": "consumer_uuid3",
"ip": "127.0.0.3",
"last_seen": 1235
}).encode("utf-8")
}
first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"})
second = requests_mock.post("http://127.0.0.2/sync", exc=requests.exceptions.ConnectTimeout)
third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"})
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
consumer_communicator_instance.sync_all()
assert first.called
assert second.called
assert third.called
assert first.last_request.json() == third.last_request.json()
assert first.last_request.json()['uuid'] == LOCAL_UUID
logging.warning.assert_called_once()
# ========================================
# App
# ========================================
def test_app_get_initial_ip_list():
lst = app.get_initial_ip_list()
assert lst == ["127.0.0.1", "192.168.0.1", "172.20.0.2"]