Merge pull request 'big work' (#2) from dev into master
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
commit
c1904dee21
2
.coveragerc
Normal file
2
.coveragerc
Normal file
@ -0,0 +1,2 @@
|
||||
[run]
|
||||
omit=venv/*
|
@ -39,7 +39,7 @@ class ConsumerCommunicator:
|
||||
response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5)
|
||||
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||||
logging.error(f"Error while syncing to {ip}: {str(e)}")
|
||||
logging.warning(f"Error while syncing to {ip}: {str(e)}")
|
||||
return
|
||||
|
||||
if response.status_code == 200:
|
||||
@ -50,10 +50,10 @@ class ConsumerCommunicator:
|
||||
ip = info['ip']
|
||||
try:
|
||||
# request synchronization
|
||||
response = requests.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5)
|
||||
response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5)
|
||||
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||||
logging.error(f"Error while syncing to {ip}: {str(e)}")
|
||||
logging.warning(f"Error while syncing to {ip}: {str(e)}")
|
||||
continue
|
||||
|
||||
if response.status_code == 200:
|
||||
|
@ -2,4 +2,5 @@ pytest
|
||||
pytest-runner
|
||||
requests_mock
|
||||
pytest-mock
|
||||
mock
|
||||
mock
|
||||
coverage
|
@ -3,10 +3,453 @@ from communicators import ConsumerCommunicator, ProducerCommunicator
|
||||
from ip_watchdog import IPWatchdog
|
||||
import pytest
|
||||
import redis
|
||||
import json
|
||||
import socket
|
||||
import requests
|
||||
import requests.exceptions
|
||||
import logging
|
||||
import os
|
||||
import app
|
||||
|
||||
REDIS_URL = "redis://localhost/0"
|
||||
REDIS_TIMEOUT = 2
|
||||
|
||||
CURRENT_HOSTNAME = "testenv.local"
|
||||
CURRENT_IPADDR = "192.168.1.50"
|
||||
|
||||
LOCAL_UUID = "testuuid1"
|
||||
os.environ["LOCAL_UUID"] = LOCAL_UUID
|
||||
|
||||
os.environ["INITIAL_SERVERS"] = "127.0.0.1,192.168.0.1,172.20.0.2"
|
||||
|
||||
@pytest.fixture
|
||||
def redis_super_storage_instance(mocker):
|
||||
mocker.patch("redis.from_url")
|
||||
yield RedisSuperStorage(REDIS_URL, REDIS_TIMEOUT)
|
||||
|
||||
|
||||
def test_something(mocker):
|
||||
@pytest.fixture
|
||||
def ip_watchdog_instance(mocker, redis_super_storage_instance):
|
||||
mocker.patch("socket.gethostname", side_effect=lambda: CURRENT_HOSTNAME)
|
||||
mocker.patch("socket.gethostbyname", side_effect=lambda a: CURRENT_IPADDR)
|
||||
yield IPWatchdog(redis_super_storage_instance)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def consumer_communicator_instance(redis_super_storage_instance):
|
||||
yield ConsumerCommunicator(redis_super_storage_instance)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def producer_communicator_instance(redis_super_storage_instance):
|
||||
yield ProducerCommunicator(redis_super_storage_instance)
|
||||
|
||||
|
||||
# ========================================
|
||||
# RedisSuperStorage
|
||||
# ========================================
|
||||
|
||||
# __init__
|
||||
|
||||
def test_rst_instance_creation(mocker):
|
||||
mocker.patch("redis.from_url")
|
||||
rst = RedisSuperStorage("test", 2)
|
||||
|
||||
redis.from_url.assert_called_once_with("test")
|
||||
assert rst._timeout == 2
|
||||
|
||||
|
||||
# ip get/set
|
||||
|
||||
def test_rst_ip_getter_get_none(redis_super_storage_instance):
|
||||
redis_super_storage_instance.r.get.side_effect = lambda a: None
|
||||
|
||||
ip = redis_super_storage_instance.current_ip
|
||||
|
||||
assert ip is None
|
||||
redis_super_storage_instance.r.get.assert_called_once_with('current_ip')
|
||||
|
||||
|
||||
def test_rst_ip_getter_get_ip(redis_super_storage_instance):
|
||||
redis_super_storage_instance.r.get.side_effect = lambda a: b"127.0.0.1"
|
||||
|
||||
ip = redis_super_storage_instance.current_ip
|
||||
|
||||
assert ip == "127.0.0.1"
|
||||
redis_super_storage_instance.r.get.assert_called_once_with('current_ip')
|
||||
|
||||
|
||||
def test_rst_ip_getter_set_ip(redis_super_storage_instance):
|
||||
redis_super_storage_instance.current_ip = "127.0.0.1"
|
||||
redis_super_storage_instance.r.set.assert_called_once_with('current_ip', b"127.0.0.1")
|
||||
|
||||
|
||||
# update consumer
|
||||
|
||||
def test_rst_update_consumer(mocker, redis_super_storage_instance):
|
||||
mocker.patch("time.time", side_effect=lambda: 123)
|
||||
redis_super_storage_instance.update_consumer("testuuid", "127.0.0.1")
|
||||
|
||||
cust_key = "consumer_testuuid"
|
||||
|
||||
info = {
|
||||
"uuid": "testuuid",
|
||||
"ip": "127.0.0.1",
|
||||
"last_seen": 123
|
||||
}
|
||||
|
||||
redis_super_storage_instance.r.set.assert_called_once_with(cust_key, json.dumps(info).encode('utf-8'))
|
||||
redis_super_storage_instance.r.expire.assert_called_once_with(cust_key, REDIS_TIMEOUT)
|
||||
|
||||
|
||||
# producer list
|
||||
|
||||
def test_rst_get_producer_list(redis_super_storage_instance):
|
||||
redis_super_storage_instance.r.keys.side_effect = lambda a: [
|
||||
b"producer_uuid1",
|
||||
b"producer_uuid2",
|
||||
b"producer_uuid3"
|
||||
]
|
||||
|
||||
data = {
|
||||
b"producer_uuid1": b"127.0.0.1",
|
||||
b"producer_uuid2": b"127.0.0.2",
|
||||
b"producer_uuid3": b"127.0.0.3",
|
||||
}
|
||||
|
||||
redis_super_storage_instance.r.get.side_effect = lambda a: data[a]
|
||||
|
||||
lst = redis_super_storage_instance.get_producer_list()
|
||||
|
||||
assert lst == {
|
||||
"producer_uuid1": "127.0.0.1",
|
||||
"producer_uuid2": "127.0.0.2",
|
||||
"producer_uuid3": "127.0.0.3",
|
||||
}
|
||||
|
||||
redis_super_storage_instance.r.keys.assert_called_once_with("producer_*")
|
||||
|
||||
|
||||
def test_rst_get_producer_expire_while_get(redis_super_storage_instance):
|
||||
redis_super_storage_instance.r.keys.side_effect = lambda a: [b"producer_uuid1"]
|
||||
redis_super_storage_instance.r.get.side_effect = lambda a: None
|
||||
|
||||
lst = redis_super_storage_instance.get_producer_list()
|
||||
|
||||
assert isinstance(lst, dict)
|
||||
assert lst == {}
|
||||
|
||||
redis_super_storage_instance.r.keys.assert_called_once_with("producer_*")
|
||||
|
||||
|
||||
# get_consumer_list
|
||||
|
||||
def test_rst_get_consumer_list(redis_super_storage_instance):
|
||||
redis_super_storage_instance.r.keys.side_effect = lambda a: [
|
||||
b"consumer_uuid1",
|
||||
b"consumer_uuid2",
|
||||
b"consumer_uuid3"
|
||||
]
|
||||
|
||||
data = {
|
||||
b"consumer_uuid1": json.dumps({
|
||||
"uuid": "consumer_uuid1",
|
||||
"ip": "127.0.0.1",
|
||||
"last_seen": 123
|
||||
}).encode("utf-8"),
|
||||
b"consumer_uuid2": json.dumps({
|
||||
"uuid": "consumer_uuid2",
|
||||
"ip": "127.0.0.2",
|
||||
"last_seen": 1234
|
||||
}).encode("utf-8"),
|
||||
b"consumer_uuid3": json.dumps({
|
||||
"uuid": "consumer_uuid3",
|
||||
"ip": "127.0.0.3",
|
||||
"last_seen": 1235
|
||||
}).encode("utf-8")
|
||||
}
|
||||
|
||||
redis_super_storage_instance.r.get.side_effect = lambda a: data[a]
|
||||
|
||||
lst = redis_super_storage_instance.get_consumer_list()
|
||||
|
||||
assert lst == {
|
||||
"consumer_uuid1": {
|
||||
"uuid": "consumer_uuid1",
|
||||
"ip": "127.0.0.1",
|
||||
"last_seen": 123
|
||||
},
|
||||
"consumer_uuid2": {
|
||||
"uuid": "consumer_uuid2",
|
||||
"ip": "127.0.0.2",
|
||||
"last_seen": 1234
|
||||
},
|
||||
"consumer_uuid3": {
|
||||
"uuid": "consumer_uuid3",
|
||||
"ip": "127.0.0.3",
|
||||
"last_seen": 1235
|
||||
}
|
||||
}
|
||||
|
||||
redis_super_storage_instance.r.keys.assert_called_once_with("consumer_*")
|
||||
assert redis_super_storage_instance.r.get.call_count == 3
|
||||
|
||||
|
||||
def test_rst_get_consumer_list_expire_while_get(redis_super_storage_instance):
|
||||
redis_super_storage_instance.r.keys.side_effect = lambda a: [b"consumer_uuid1"]
|
||||
redis_super_storage_instance.r.get.side_effect = lambda a: None
|
||||
|
||||
lst = redis_super_storage_instance.get_consumer_list()
|
||||
|
||||
assert isinstance(lst, dict)
|
||||
assert lst == {}
|
||||
|
||||
redis_super_storage_instance.r.keys.assert_called_once_with("consumer_*")
|
||||
redis_super_storage_instance.r.get.assert_called_once_with(b"consumer_uuid1")
|
||||
|
||||
|
||||
# ========================================
|
||||
# IPWatchdog
|
||||
# ========================================
|
||||
|
||||
# __init__
|
||||
|
||||
def test_ipw_instantiate(mocker, redis_super_storage_instance):
|
||||
mocker.patch("socket.gethostname", side_effect=lambda: "test")
|
||||
ipw = IPWatchdog(redis_super_storage_instance)
|
||||
|
||||
assert ipw._host_name == "test"
|
||||
assert ipw._redis_store == redis_super_storage_instance
|
||||
socket.gethostname.assert_called_once()
|
||||
|
||||
|
||||
def test_ipw_is_changed_false(mocker, ip_watchdog_instance):
|
||||
ip_watchdog_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8")
|
||||
|
||||
changed, ip = ip_watchdog_instance.ip_changed()
|
||||
|
||||
assert not changed
|
||||
assert ip == CURRENT_IPADDR
|
||||
|
||||
ip_watchdog_instance._redis_store.r.get.assert_called_once_with("current_ip")
|
||||
assert not ip_watchdog_instance._redis_store.r.set.called
|
||||
socket.gethostbyname.assert_called_once_with(CURRENT_HOSTNAME)
|
||||
|
||||
|
||||
def test_ipw_is_changed_true(mocker, ip_watchdog_instance):
|
||||
mocker.patch("socket.gethostbyname", side_effect=lambda a: "192.168.2.123")
|
||||
ip_watchdog_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8")
|
||||
|
||||
changed, ip = ip_watchdog_instance.ip_changed()
|
||||
|
||||
assert changed
|
||||
assert ip == "192.168.2.123"
|
||||
|
||||
ip_watchdog_instance._redis_store.r.get.assert_called_once_with("current_ip")
|
||||
assert ip_watchdog_instance._redis_store.r.set.called_once_with("current_ip", b"192.168.2.123")
|
||||
socket.gethostbyname.assert_called_once_with(CURRENT_HOSTNAME)
|
||||
|
||||
|
||||
# ========================================
|
||||
# Communicators
|
||||
# ========================================
|
||||
|
||||
|
||||
def test_cc_instantiate(redis_super_storage_instance):
|
||||
cc = ConsumerCommunicator(redis_super_storage_instance)
|
||||
|
||||
assert cc._redis_store == redis_super_storage_instance
|
||||
assert isinstance(cc._session, requests.Session)
|
||||
|
||||
|
||||
def test_pc_instantiate(redis_super_storage_instance):
|
||||
pc = ProducerCommunicator(redis_super_storage_instance)
|
||||
|
||||
assert pc._redis_store == redis_super_storage_instance
|
||||
assert isinstance(pc._session, requests.Session)
|
||||
|
||||
# producer communicator
|
||||
|
||||
def test_pc_push_ip_update(requests_mock, producer_communicator_instance):
|
||||
producer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
|
||||
b"producer_uuid1",
|
||||
b"producer_uuid2",
|
||||
b"producer_uuid3"
|
||||
]
|
||||
|
||||
data = {
|
||||
b"producer_uuid1": b"127.0.0.1",
|
||||
b"producer_uuid2": b"127.0.0.2",
|
||||
b"producer_uuid3": b"127.0.0.3",
|
||||
}
|
||||
|
||||
producer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
|
||||
|
||||
first = requests_mock.post("http://127.0.0.1/ip")
|
||||
second = requests_mock.post("http://127.0.0.2/ip")
|
||||
third = requests_mock.post("http://127.0.0.3/ip")
|
||||
|
||||
producer_communicator_instance.push_ip_update(CURRENT_IPADDR)
|
||||
|
||||
assert first.call_count == 1
|
||||
assert second.call_count == 1
|
||||
assert third.call_count == 1
|
||||
|
||||
assert first.last_request.json() == second.last_request.json() == third.last_request.json()
|
||||
|
||||
assert first.last_request.json()['ip'] == CURRENT_IPADDR
|
||||
assert first.last_request.json()['uuid'] == LOCAL_UUID
|
||||
|
||||
|
||||
def test_pc_push_ip_update_error_logged(mocker, requests_mock, producer_communicator_instance):
|
||||
mocker.patch("logging.warning")
|
||||
|
||||
producer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
|
||||
b"producer_uuid1",
|
||||
b"producer_uuid2",
|
||||
b"producer_uuid3"
|
||||
]
|
||||
|
||||
data = {
|
||||
b"producer_uuid1": b"127.0.0.1",
|
||||
b"producer_uuid2": b"127.0.0.2",
|
||||
b"producer_uuid3": b"127.0.0.3",
|
||||
}
|
||||
|
||||
producer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
|
||||
|
||||
first = requests_mock.post("http://127.0.0.1/ip")
|
||||
second = requests_mock.post("http://127.0.0.2/ip", exc=requests.exceptions.ConnectTimeout)
|
||||
third = requests_mock.post("http://127.0.0.3/ip")
|
||||
|
||||
producer_communicator_instance.push_ip_update(CURRENT_IPADDR)
|
||||
|
||||
assert first.call_count == 1
|
||||
assert second.call_count == 1
|
||||
assert third.call_count == 1
|
||||
|
||||
assert first.last_request.json() == third.last_request.json()
|
||||
|
||||
assert first.last_request.json()['ip'] == CURRENT_IPADDR
|
||||
assert first.last_request.json()['uuid'] == LOCAL_UUID
|
||||
|
||||
logging.warning.assert_called_once()
|
||||
|
||||
# customer communicator
|
||||
|
||||
def test_cc_targeted_sync(requests_mock, consumer_communicator_instance):
|
||||
a = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "testasdasdasd"})
|
||||
|
||||
consumer_communicator_instance.targeted_snyc("127.0.0.2")
|
||||
|
||||
assert a.called
|
||||
assert a.last_request.json() == {'uuid': LOCAL_UUID}
|
||||
|
||||
|
||||
def test_cc_targeted_sync_error_logged(mocker, requests_mock, consumer_communicator_instance):
|
||||
mocker.patch("logging.warning")
|
||||
|
||||
requests_mock.post("http://127.0.0.2/sync", exc=requests.exceptions.ConnectTimeout)
|
||||
|
||||
consumer_communicator_instance.targeted_snyc("127.0.0.2")
|
||||
|
||||
logging.warning.assert_called_once()
|
||||
|
||||
|
||||
def test_cc_sync_all(requests_mock, consumer_communicator_instance):
|
||||
consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
|
||||
b"consumer_uuid1",
|
||||
b"consumer_uuid2",
|
||||
b"consumer_uuid3"
|
||||
]
|
||||
|
||||
data = {
|
||||
b"consumer_uuid1": json.dumps({
|
||||
"uuid": "consumer_uuid1",
|
||||
"ip": "127.0.0.1",
|
||||
"last_seen": 123
|
||||
}).encode("utf-8"),
|
||||
b"consumer_uuid2": json.dumps({
|
||||
"uuid": "consumer_uuid2",
|
||||
"ip": "127.0.0.2",
|
||||
"last_seen": 1234
|
||||
}).encode("utf-8"),
|
||||
b"consumer_uuid3": json.dumps({
|
||||
"uuid": "consumer_uuid3",
|
||||
"ip": "127.0.0.3",
|
||||
"last_seen": 1235
|
||||
}).encode("utf-8")
|
||||
}
|
||||
|
||||
first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"})
|
||||
second = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "consumer_uuid2"})
|
||||
third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"})
|
||||
|
||||
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
|
||||
|
||||
consumer_communicator_instance.sync_all()
|
||||
|
||||
assert first.called
|
||||
assert second.called
|
||||
assert third.called
|
||||
|
||||
assert first.last_request.json() == second.last_request.json() == third.last_request.json()
|
||||
|
||||
assert first.last_request.json()['uuid'] == LOCAL_UUID
|
||||
|
||||
|
||||
def test_cc_sync_all_error_logged(mocker, requests_mock, consumer_communicator_instance):
|
||||
mocker.patch("logging.warning")
|
||||
|
||||
consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
|
||||
b"consumer_uuid1",
|
||||
b"consumer_uuid2",
|
||||
b"consumer_uuid3"
|
||||
]
|
||||
|
||||
data = {
|
||||
b"consumer_uuid1": json.dumps({
|
||||
"uuid": "consumer_uuid1",
|
||||
"ip": "127.0.0.1",
|
||||
"last_seen": 123
|
||||
}).encode("utf-8"),
|
||||
b"consumer_uuid2": json.dumps({
|
||||
"uuid": "consumer_uuid2",
|
||||
"ip": "127.0.0.2",
|
||||
"last_seen": 1234
|
||||
}).encode("utf-8"),
|
||||
b"consumer_uuid3": json.dumps({
|
||||
"uuid": "consumer_uuid3",
|
||||
"ip": "127.0.0.3",
|
||||
"last_seen": 1235
|
||||
}).encode("utf-8")
|
||||
}
|
||||
|
||||
first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"})
|
||||
second = requests_mock.post("http://127.0.0.2/sync", exc=requests.exceptions.ConnectTimeout)
|
||||
third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"})
|
||||
|
||||
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
|
||||
|
||||
consumer_communicator_instance.sync_all()
|
||||
|
||||
assert first.called
|
||||
assert second.called
|
||||
assert third.called
|
||||
|
||||
assert first.last_request.json() == third.last_request.json()
|
||||
|
||||
assert first.last_request.json()['uuid'] == LOCAL_UUID
|
||||
|
||||
logging.warning.assert_called_once()
|
||||
|
||||
|
||||
# ========================================
|
||||
# App
|
||||
# ========================================
|
||||
|
||||
|
||||
def test_app_get_initial_ip_list():
|
||||
lst = app.get_initial_ip_list()
|
||||
|
||||
assert lst == ["127.0.0.1", "192.168.0.1", "172.20.0.2"]
|
||||
|
Reference in New Issue
Block a user