Added force ip override setting
This commit is contained in:
parent
3cefcabad2
commit
f01001d331
@ -55,7 +55,10 @@ class ProducerCommunicator:
|
|||||||
try:
|
try:
|
||||||
response = self._session.post(
|
response = self._session.post(
|
||||||
f"http://{ip}/ip",
|
f"http://{ip}/ip",
|
||||||
json={'uuid': os.environ['LOCAL_UUID'], 'ip': newip},
|
json={
|
||||||
|
'uuid': os.environ['LOCAL_UUID'],
|
||||||
|
'ip': newip
|
||||||
|
},
|
||||||
timeout=5
|
timeout=5
|
||||||
)
|
)
|
||||||
logging.debug(f"Pushed update to {key} at {ip}. Response: {response.status_code}")
|
logging.debug(f"Pushed update to {key} at {ip}. Response: {response.status_code}")
|
||||||
@ -69,14 +72,16 @@ class ConsumerCommunicator:
|
|||||||
The addresses of consumers are fetched from the `RedisSuperStorage`.
|
The addresses of consumers are fetched from the `RedisSuperStorage`.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, redis_store: RedisSuperStorage):
|
def __init__(self, redis_store: RedisSuperStorage, force_ip_override: bool = False):
|
||||||
"""
|
"""
|
||||||
Upon creating this object. A requests session is created on order to take advantage of keep-alive connections.
|
Upon creating this object. A requests session is created on order to take advantage of keep-alive connections.
|
||||||
|
|
||||||
:param redis_store: A `RedisSuperStorage` instance.
|
:param redis_store: A `RedisSuperStorage` instance.
|
||||||
|
:param force_ip_override: Include the ip address stored in redis to the sync message (Disable the reciever ip discovery in the other consumer)
|
||||||
"""
|
"""
|
||||||
self._redis_store = redis_store
|
self._redis_store = redis_store
|
||||||
self._session = requests.Session()
|
self._session = requests.Session()
|
||||||
|
self._force_ip_override = force_ip_override
|
||||||
|
|
||||||
def targeted_snyc(self, ip: str):
|
def targeted_snyc(self, ip: str):
|
||||||
"""
|
"""
|
||||||
@ -97,14 +102,21 @@ class ConsumerCommunicator:
|
|||||||
Body::
|
Body::
|
||||||
|
|
||||||
{
|
{
|
||||||
"uuid" : "str: LOCAL_UUID"
|
"uuid" : "str: LOCAL_UUID",
|
||||||
|
"ip" : "str: optional: IP override"
|
||||||
}
|
}
|
||||||
|
|
||||||
:param ip: The ip address of the consumer to be synced to.
|
:param ip: The ip address of the consumer to be synced to.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
message = {'uuid': os.environ['LOCAL_UUID']}
|
||||||
|
|
||||||
|
if self._force_ip_override:
|
||||||
|
message['ip'] = self._redis_store.current_ip
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# request synchronization
|
# request synchronization
|
||||||
response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5)
|
response = self._session.post(f"http://{ip}/sync", json=message, timeout=5)
|
||||||
|
|
||||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||||||
logging.warning(f"Error while syncing to {ip}: {str(e)}")
|
logging.warning(f"Error while syncing to {ip}: {str(e)}")
|
||||||
@ -132,15 +144,22 @@ class ConsumerCommunicator:
|
|||||||
Body::
|
Body::
|
||||||
|
|
||||||
{
|
{
|
||||||
"uuid" : "str: LOCAL_UUID"
|
"uuid" : "str: LOCAL_UUID",
|
||||||
|
"ip" : "str: optional: IP override"
|
||||||
}
|
}
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
message = {'uuid': os.environ['LOCAL_UUID']}
|
||||||
|
|
||||||
|
if self._force_ip_override:
|
||||||
|
message['ip'] = self._redis_store.current_ip
|
||||||
|
|
||||||
for uuid, info in self._redis_store.get_consumer_list().items():
|
for uuid, info in self._redis_store.get_consumer_list().items():
|
||||||
ip = info['ip']
|
ip = info['ip']
|
||||||
try:
|
try:
|
||||||
# request synchronization
|
# request synchronization
|
||||||
response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5)
|
response = self._session.post(f"http://{ip}/sync", json=message, timeout=5)
|
||||||
|
|
||||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||||||
logging.warning(f"Error while syncing to {ip}: {str(e)}")
|
logging.warning(f"Error while syncing to {ip}: {str(e)}")
|
||||||
|
@ -22,6 +22,7 @@ os.environ["LOCAL_UUID"] = LOCAL_UUID
|
|||||||
|
|
||||||
os.environ["INITIAL_SERVERS"] = "127.0.0.1,192.168.0.1,172.20.0.2"
|
os.environ["INITIAL_SERVERS"] = "127.0.0.1,192.168.0.1,172.20.0.2"
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def redis_super_storage_instance(mocker):
|
def redis_super_storage_instance(mocker):
|
||||||
mocker.patch("redis.from_url")
|
mocker.patch("redis.from_url")
|
||||||
@ -268,6 +269,7 @@ def test_pc_instantiate(redis_super_storage_instance):
|
|||||||
assert pc._redis_store == redis_super_storage_instance
|
assert pc._redis_store == redis_super_storage_instance
|
||||||
assert isinstance(pc._session, requests.Session)
|
assert isinstance(pc._session, requests.Session)
|
||||||
|
|
||||||
|
|
||||||
# producer communicator
|
# producer communicator
|
||||||
|
|
||||||
def test_pc_push_ip_update(requests_mock, producer_communicator_instance):
|
def test_pc_push_ip_update(requests_mock, producer_communicator_instance):
|
||||||
@ -335,6 +337,7 @@ def test_pc_push_ip_update_error_logged(mocker, requests_mock, producer_communic
|
|||||||
|
|
||||||
logging.warning.assert_called_once()
|
logging.warning.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
# customer communicator
|
# customer communicator
|
||||||
|
|
||||||
def test_cc_targeted_sync(requests_mock, consumer_communicator_instance):
|
def test_cc_targeted_sync(requests_mock, consumer_communicator_instance):
|
||||||
@ -346,6 +349,17 @@ def test_cc_targeted_sync(requests_mock, consumer_communicator_instance):
|
|||||||
assert a.last_request.json() == {'uuid': LOCAL_UUID}
|
assert a.last_request.json() == {'uuid': LOCAL_UUID}
|
||||||
|
|
||||||
|
|
||||||
|
def test_cc_targeted_sync_ip_override(requests_mock, consumer_communicator_instance):
|
||||||
|
consumer_communicator_instance._force_ip_override = True
|
||||||
|
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8")
|
||||||
|
a = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "testasdasdasd"})
|
||||||
|
|
||||||
|
consumer_communicator_instance.targeted_snyc("127.0.0.2")
|
||||||
|
|
||||||
|
assert a.called
|
||||||
|
assert a.last_request.json() == {'uuid': LOCAL_UUID, "ip": CURRENT_IPADDR}
|
||||||
|
|
||||||
|
|
||||||
def test_cc_targeted_sync_error_logged(mocker, requests_mock, consumer_communicator_instance):
|
def test_cc_targeted_sync_error_logged(mocker, requests_mock, consumer_communicator_instance):
|
||||||
mocker.patch("logging.warning")
|
mocker.patch("logging.warning")
|
||||||
|
|
||||||
@ -398,6 +412,51 @@ def test_cc_sync_all(requests_mock, consumer_communicator_instance):
|
|||||||
assert first.last_request.json()['uuid'] == LOCAL_UUID
|
assert first.last_request.json()['uuid'] == LOCAL_UUID
|
||||||
|
|
||||||
|
|
||||||
|
def test_cc_sync_all_ip_override(requests_mock, consumer_communicator_instance):
|
||||||
|
consumer_communicator_instance._force_ip_override = True
|
||||||
|
consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
|
||||||
|
b"consumer_uuid1",
|
||||||
|
b"consumer_uuid2",
|
||||||
|
b"consumer_uuid3"
|
||||||
|
]
|
||||||
|
|
||||||
|
data = {
|
||||||
|
b"consumer_uuid1": json.dumps({
|
||||||
|
"uuid": "consumer_uuid1",
|
||||||
|
"ip": "127.0.0.1",
|
||||||
|
"last_seen": 123
|
||||||
|
}).encode("utf-8"),
|
||||||
|
b"consumer_uuid2": json.dumps({
|
||||||
|
"uuid": "consumer_uuid2",
|
||||||
|
"ip": "127.0.0.2",
|
||||||
|
"last_seen": 1234
|
||||||
|
}).encode("utf-8"),
|
||||||
|
b"consumer_uuid3": json.dumps({
|
||||||
|
"uuid": "consumer_uuid3",
|
||||||
|
"ip": "127.0.0.3",
|
||||||
|
"last_seen": 1235
|
||||||
|
}).encode("utf-8"),
|
||||||
|
"current_ip": CURRENT_IPADDR.encode("utf-8")
|
||||||
|
}
|
||||||
|
|
||||||
|
first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"})
|
||||||
|
second = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "consumer_uuid2"})
|
||||||
|
third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"})
|
||||||
|
|
||||||
|
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
|
||||||
|
|
||||||
|
consumer_communicator_instance.sync_all()
|
||||||
|
|
||||||
|
assert first.called
|
||||||
|
assert second.called
|
||||||
|
assert third.called
|
||||||
|
|
||||||
|
assert first.last_request.json() == second.last_request.json() == third.last_request.json()
|
||||||
|
|
||||||
|
assert first.last_request.json()['uuid'] == LOCAL_UUID
|
||||||
|
assert first.last_request.json()['ip'] == CURRENT_IPADDR
|
||||||
|
|
||||||
|
|
||||||
def test_cc_sync_all_error_logged(mocker, requests_mock, consumer_communicator_instance):
|
def test_cc_sync_all_error_logged(mocker, requests_mock, consumer_communicator_instance):
|
||||||
mocker.patch("logging.warning")
|
mocker.patch("logging.warning")
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user