From f01001d3314513d05f0735d9c5fe798f655f70af Mon Sep 17 00:00:00 2001 From: marcsello Date: Thu, 14 May 2020 22:25:47 +0200 Subject: [PATCH] Added force ip override setting --- communicators.py | 31 +++++++++++++++++++----- test_everything.py | 59 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 6 deletions(-) diff --git a/communicators.py b/communicators.py index 8d84fd4..f38b37e 100644 --- a/communicators.py +++ b/communicators.py @@ -55,7 +55,10 @@ class ProducerCommunicator: try: response = self._session.post( f"http://{ip}/ip", - json={'uuid': os.environ['LOCAL_UUID'], 'ip': newip}, + json={ + 'uuid': os.environ['LOCAL_UUID'], + 'ip': newip + }, timeout=5 ) logging.debug(f"Pushed update to {key} at {ip}. Response: {response.status_code}") @@ -69,14 +72,16 @@ class ConsumerCommunicator: The addresses of consumers are fetched from the `RedisSuperStorage`. """ - def __init__(self, redis_store: RedisSuperStorage): + def __init__(self, redis_store: RedisSuperStorage, force_ip_override: bool = False): """ Upon creating this object. A requests session is created on order to take advantage of keep-alive connections. :param redis_store: A `RedisSuperStorage` instance. + :param force_ip_override: Include the ip address stored in redis to the sync message (Disable the reciever ip discovery in the other consumer) """ self._redis_store = redis_store self._session = requests.Session() + self._force_ip_override = force_ip_override def targeted_snyc(self, ip: str): """ @@ -97,14 +102,21 @@ class ConsumerCommunicator: Body:: { - "uuid" : "str: LOCAL_UUID" + "uuid" : "str: LOCAL_UUID", + "ip" : "str: optional: IP override" } :param ip: The ip address of the consumer to be synced to. """ + + message = {'uuid': os.environ['LOCAL_UUID']} + + if self._force_ip_override: + message['ip'] = self._redis_store.current_ip + try: # request synchronization - response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) + response = self._session.post(f"http://{ip}/sync", json=message, timeout=5) except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: logging.warning(f"Error while syncing to {ip}: {str(e)}") @@ -132,15 +144,22 @@ class ConsumerCommunicator: Body:: { - "uuid" : "str: LOCAL_UUID" + "uuid" : "str: LOCAL_UUID", + "ip" : "str: optional: IP override" } """ + + message = {'uuid': os.environ['LOCAL_UUID']} + + if self._force_ip_override: + message['ip'] = self._redis_store.current_ip + for uuid, info in self._redis_store.get_consumer_list().items(): ip = info['ip'] try: # request synchronization - response = self._session.post(f"http://{ip}/sync", json={'uuid': os.environ['LOCAL_UUID']}, timeout=5) + response = self._session.post(f"http://{ip}/sync", json=message, timeout=5) except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: logging.warning(f"Error while syncing to {ip}: {str(e)}") diff --git a/test_everything.py b/test_everything.py index 8c843af..6980692 100644 --- a/test_everything.py +++ b/test_everything.py @@ -22,6 +22,7 @@ os.environ["LOCAL_UUID"] = LOCAL_UUID os.environ["INITIAL_SERVERS"] = "127.0.0.1,192.168.0.1,172.20.0.2" + @pytest.fixture def redis_super_storage_instance(mocker): mocker.patch("redis.from_url") @@ -268,6 +269,7 @@ def test_pc_instantiate(redis_super_storage_instance): assert pc._redis_store == redis_super_storage_instance assert isinstance(pc._session, requests.Session) + # producer communicator def test_pc_push_ip_update(requests_mock, producer_communicator_instance): @@ -335,6 +337,7 @@ def test_pc_push_ip_update_error_logged(mocker, requests_mock, producer_communic logging.warning.assert_called_once() + # customer communicator def test_cc_targeted_sync(requests_mock, consumer_communicator_instance): @@ -346,6 +349,17 @@ def test_cc_targeted_sync(requests_mock, consumer_communicator_instance): assert a.last_request.json() == {'uuid': LOCAL_UUID} +def test_cc_targeted_sync_ip_override(requests_mock, consumer_communicator_instance): + consumer_communicator_instance._force_ip_override = True + consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8") + a = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "testasdasdasd"}) + + consumer_communicator_instance.targeted_snyc("127.0.0.2") + + assert a.called + assert a.last_request.json() == {'uuid': LOCAL_UUID, "ip": CURRENT_IPADDR} + + def test_cc_targeted_sync_error_logged(mocker, requests_mock, consumer_communicator_instance): mocker.patch("logging.warning") @@ -398,6 +412,51 @@ def test_cc_sync_all(requests_mock, consumer_communicator_instance): assert first.last_request.json()['uuid'] == LOCAL_UUID +def test_cc_sync_all_ip_override(requests_mock, consumer_communicator_instance): + consumer_communicator_instance._force_ip_override = True + consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [ + b"consumer_uuid1", + b"consumer_uuid2", + b"consumer_uuid3" + ] + + data = { + b"consumer_uuid1": json.dumps({ + "uuid": "consumer_uuid1", + "ip": "127.0.0.1", + "last_seen": 123 + }).encode("utf-8"), + b"consumer_uuid2": json.dumps({ + "uuid": "consumer_uuid2", + "ip": "127.0.0.2", + "last_seen": 1234 + }).encode("utf-8"), + b"consumer_uuid3": json.dumps({ + "uuid": "consumer_uuid3", + "ip": "127.0.0.3", + "last_seen": 1235 + }).encode("utf-8"), + "current_ip": CURRENT_IPADDR.encode("utf-8") + } + + first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"}) + second = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "consumer_uuid2"}) + third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"}) + + consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a] + + consumer_communicator_instance.sync_all() + + assert first.called + assert second.called + assert third.called + + assert first.last_request.json() == second.last_request.json() == third.last_request.json() + + assert first.last_request.json()['uuid'] == LOCAL_UUID + assert first.last_request.json()['ip'] == CURRENT_IPADDR + + def test_cc_sync_all_error_logged(mocker, requests_mock, consumer_communicator_instance): mocker.patch("logging.warning")