Compare commits
48 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ae822d87d8 | |||
| 16bdd3c22c | |||
| f01001d331 | |||
| 3b5c36048d | |||
| 3f580c7dfc | |||
| 3cefcabad2 | |||
| 13842daf1c | |||
| 5d2bed0f14 | |||
| 1a486833bb | |||
| ac90ca829f | |||
| 108db57271 | |||
| 04e189f541 | |||
| f267c49715 | |||
| c1904dee21 | |||
| 879b241060 | |||
| 06279d1346 | |||
| 2a52501ff3 | |||
| 547499d840 | |||
| c46dbd8211 | |||
| c1cbe13b6f | |||
| d6e2beb294 | |||
| f90bc35309 | |||
| 0bea194f4a | |||
| 1158e6cc60 | |||
| b7fe49037f | |||
| 850598d519 | |||
| 0713cabc6b | |||
| 7ce88f1a74 | |||
| 77e915c63e | |||
| 8668550a3f | |||
| d8bdd717c7 | |||
| dbeb35785e | |||
| d84564ff95 | |||
| 59ae404a19 | |||
| 989b646d45 | |||
| fe7fcb3714 | |||
| 2d326d79f4 | |||
| 6825efca87 | |||
| 7702cb2c7e | |||
| d32db92bab | |||
| 5f2894e727 | |||
| fdac9446af | |||
| d0d6267c74 | |||
| aed27cba95 | |||
| 9d94dbd6d6 | |||
| 2a77c552bf | |||
| a1ca3285ab | |||
| a60ee01ae1 |
@@ -0,0 +1,2 @@
|
||||
[run]
|
||||
omit=venv/*
|
||||
+67
-20
@@ -3,32 +3,61 @@ type: docker
|
||||
name: default
|
||||
|
||||
steps:
|
||||
- name: restore-cache-with-filesystem
|
||||
image: meltwater/drone-cache
|
||||
settings:
|
||||
backend: "filesystem"
|
||||
restore: true
|
||||
cache_key: "{{ .Repo.Name }}"
|
||||
archive_format: "gzip"
|
||||
filesystem_cache_root: "/tmp/cache"
|
||||
mount:
|
||||
- '.pipcache'
|
||||
volumes:
|
||||
- name: cache
|
||||
path: /tmp/cache
|
||||
|
||||
- name: static_analysis
|
||||
image: python:3
|
||||
commands:
|
||||
- pip3 install pylint bandit mccabe
|
||||
- pip3 install -r requirements.txt
|
||||
- pip3 install --cache-dir='./.pipcache' -r requirements.txt
|
||||
- find . -name "*.py" -exec python3 -m py_compile '{}' \;
|
||||
- find . -name "*.py" -exec pylint '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi
|
||||
- find . -name "*.py" -exec python3 -m mccabe --min 3 '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi
|
||||
- bandit -r . + || if [ $? -eq 1 ]; then echo "you fail"; fi
|
||||
|
||||
- name: build
|
||||
image: docker:stable-dind
|
||||
volumes:
|
||||
- name: dockersock
|
||||
path: /var/run
|
||||
- name: unit_test
|
||||
image: python:3.8
|
||||
environment:
|
||||
DOCKER_USERNAME:
|
||||
from_secret: DOCKER_USERNAME
|
||||
DOCKER_PASSWORD:
|
||||
from_secret: DOCKER_PASSWORD
|
||||
PRODUCER_REDIS: cache
|
||||
commands:
|
||||
- echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin
|
||||
- docker build -t="$DOCKER_USERNAME/consumer-scheduler" .
|
||||
- docker build -t="$DOCKER_USERNAME/consumer-scheduler:$DRONE_BUILD_NUMBER" .
|
||||
- docker push "$DOCKER_USERNAME/consumer-scheduler"
|
||||
- docker push "$DOCKER_USERNAME/consumer-scheduler:$DRONE_BUILD_NUMBER"
|
||||
- pip3 install --cache-dir='./.pipcache' -r requirements.txt
|
||||
- pip3 install --cache-dir='./.pipcache' -r requirements_dev.txt
|
||||
- pytest
|
||||
|
||||
- name: coverage
|
||||
image: python:3.8
|
||||
environment:
|
||||
PRODUCER_REDIS: cache
|
||||
commands:
|
||||
- pip3 install --cache-dir='./.pipcache' -r requirements.txt
|
||||
- pip3 install --cache-dir='./.pipcache' -r requirements_dev.txt
|
||||
- coverage run -m pytest
|
||||
- coverage report -m
|
||||
|
||||
- name: build-app
|
||||
image: banzaicloud/drone-kaniko
|
||||
settings:
|
||||
registry: registry.kmlabz.com
|
||||
repo: goldenpogacsa/${DRONE_REPO_NAME}
|
||||
username:
|
||||
from_secret: DOCKER_USERNAME
|
||||
password:
|
||||
from_secret: DOCKER_PASSWORD
|
||||
tags:
|
||||
- latest
|
||||
- ${DRONE_BUILD_NUMBER}
|
||||
|
||||
- name: make_docs
|
||||
image: python:3.8
|
||||
@@ -38,6 +67,21 @@ steps:
|
||||
- cd docs
|
||||
- make html
|
||||
|
||||
- name: rebuild-cache-with-filesystem
|
||||
image: meltwater/drone-cache
|
||||
pull: true
|
||||
settings:
|
||||
backend: "filesystem"
|
||||
rebuild: true
|
||||
cache_key: "{{ .Repo.Name }}"
|
||||
archive_format: "gzip"
|
||||
filesystem_cache_root: "/tmp/cache"
|
||||
mount:
|
||||
- '.pipcache'
|
||||
volumes:
|
||||
- name: cache
|
||||
path: /tmp/cache
|
||||
|
||||
- name: build_docs
|
||||
image: docker:stable-dind
|
||||
volumes:
|
||||
@@ -50,11 +94,11 @@ steps:
|
||||
from_secret: DOCKER_PASSWORD
|
||||
commands:
|
||||
- cd docs
|
||||
- echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin
|
||||
- docker build -t="$DOCKER_USERNAME/consumer-scheduler-docs" .
|
||||
- docker build -t="$DOCKER_USERNAME/consumer-scheduler-docs:$DRONE_BUILD_NUMBER" .
|
||||
- docker push "$DOCKER_USERNAME/consumer-scheduler-docs"
|
||||
- docker push "$DOCKER_USERNAME/consumer-scheduler-docs:$DRONE_BUILD_NUMBER"
|
||||
- echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin registry.kmlabz.com
|
||||
- docker build -t="registry.kmlabz.com/goldenpogacsa/consumer-scheduler-docs" .
|
||||
- docker build -t="registry.kmlabz.com/goldenpogacsa/consumer-scheduler-docs:$DRONE_BUILD_NUMBER" .
|
||||
- docker push "registry.kmlabz.com/goldenpogacsa/consumer-scheduler-docs"
|
||||
- docker push "registry.kmlabz.com/goldenpogacsa/consumer-scheduler-docs:$DRONE_BUILD_NUMBER"
|
||||
|
||||
- name: slack
|
||||
image: plugins/slack
|
||||
@@ -77,3 +121,6 @@ services:
|
||||
volumes:
|
||||
- name: dockersock
|
||||
temp: {}
|
||||
- name: cache
|
||||
host:
|
||||
path: "/tmp/cache"
|
||||
|
||||
+76
-2
@@ -2,6 +2,80 @@
|
||||
P2P Consumer Scheduler
|
||||
======================
|
||||
|
||||
Repository for the consumer's scheduler
|
||||
The scheduler part of a consumer system for the P2P storage system.
|
||||
|
||||
Produced by GoldenPogácsa Inc.
|
||||
|
||||
Basics
|
||||
------
|
||||
This is a component of a fully working consumer system.
|
||||
In order to setup a consumer you will need the consumer API as well and a Redis database.
|
||||
|
||||
This component faciliates the automatic synchronization between consumers.
|
||||
As well as detecting IP changes of the host which is running on.
|
||||
|
||||
Part of a system
|
||||
----------------
|
||||
|
||||
This component works as a complimentary part for the consumer API.
|
||||
|
||||
The information exchange between the API is solved by using a Common Redis database.
|
||||
This database must be shared between the API and the Scheduler instance.
|
||||
|
||||
Also the UUID is shared between the two components. This is obvious since they are essentially both parts of the same system.
|
||||
|
||||
|
||||
Communications
|
||||
--------------
|
||||
|
||||
The communication between other consumers and producers are solved by their REST API endpoints.
|
||||
|
||||
The follwing events will cause communication by this object:
|
||||
* A sychronization task
|
||||
* An IP address change detected
|
||||
|
||||
Since the communication between consumers happens regularly the IP address change event does not cause an immediate synchronization
|
||||
However communication originated by scheduler toward producers only happens when the IP address change is detected.
|
||||
|
||||
Synchronization
|
||||
~~~~~~~~~~~~~~~
|
||||
|
||||
This call originated by the scheduler and happens regularly (See. configuration).
|
||||
|
||||
A synchronization task causes the followings:
|
||||
* Each consumer known by this consumer is checked for availability
|
||||
* Each consumer this consumer communicated with updated the availability of this consumer.
|
||||
* Each consumer which had no information of this consumer, now have.
|
||||
|
||||
Consumers learn about each other, and their IP addresses (and changes) during a synchronization.
|
||||
|
||||
Called URL::
|
||||
|
||||
http://<consumer ip>/sync
|
||||
|
||||
Body::
|
||||
|
||||
{
|
||||
"uuid" : "str: LOCAL_UUID",
|
||||
"ip" : "str: optional: IP override"
|
||||
}
|
||||
|
||||
|
||||
IP update
|
||||
~~~~~~~~~
|
||||
|
||||
This communication is originated by the scheduler when an IP address change is detected.
|
||||
|
||||
This call is used to let producers immanently know about an IP change of their consumer, so that they can converge faster.
|
||||
|
||||
Called URL::
|
||||
|
||||
http://<producer ip>/ip
|
||||
|
||||
Body::
|
||||
|
||||
{
|
||||
"uuid" : "str: LOCAL_UUID",
|
||||
"ip": "str: provided by param"
|
||||
}
|
||||
|
||||
Functionality: updates the database of the available consumers periodically
|
||||
|
||||
@@ -1,16 +1,15 @@
|
||||
#!/usr/bin/env python
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
This is the top level module, containing the main application. Launching this file will launch the scheduler part of the consumer application.
|
||||
"""
|
||||
import sentry_sdk
|
||||
import time
|
||||
import requests
|
||||
import requests.exceptions
|
||||
import os
|
||||
import redis
|
||||
import json
|
||||
import logging
|
||||
|
||||
"""
|
||||
Scheduler
|
||||
"""
|
||||
from redis_super_storage import RedisSuperStorage
|
||||
from communicators import ConsumerCommunicator, ProducerCommunicator
|
||||
from ip_watchdog import IPWatchdog
|
||||
|
||||
__author__ = "@kocsisr"
|
||||
__copyright__ = "Copyright 2020, GoldenPogácsa Team"
|
||||
@@ -20,64 +19,62 @@ __version__text__ = "1"
|
||||
sentry_sdk.init("https://0a106e104e114bc9a3fa47f9cb0db2f4@sentry.kmlabz.com/10")
|
||||
|
||||
|
||||
def main():
|
||||
def get_initial_ip_list() -> list:
|
||||
"""
|
||||
This method is used to parse the content of INITIAL_SERVERS environment variable.
|
||||
The contents of this variable is a list of ip addresses separated by commas.
|
||||
This mehod returns a Python native `list` object containing the addreseses provided.
|
||||
|
||||
# set logging preferences
|
||||
logging.basicConfig(filename = '', level = logging.DEBUG)
|
||||
|
||||
# connect to redis
|
||||
r = redis.Redis(host = 'localhost', port = 6379, db = 0)
|
||||
|
||||
# set initial consumer addresses
|
||||
:return: A list of ip addresses provided by the environmental variable
|
||||
"""
|
||||
ip_list = os.environ['INITIAL_SERVERS'].split(',')
|
||||
logging.debug('Get consumer list from environ at first: Done')
|
||||
# get the dictionary of the currently available consumers
|
||||
consumer_list_redis = json.loads((r.get('consumer_list') or b'{}').decode('utf-8'))
|
||||
logging.debug('Get consumer list from redis at first: Done')
|
||||
temp_dict = { }
|
||||
logging.debug('Initial ip list ' + ", ".join(ip_list))
|
||||
return ip_list
|
||||
|
||||
for ip in ip_list:
|
||||
try:
|
||||
# request synchronization
|
||||
response = requests.post(f"http://{ip}/sync", json = { 'uuid': os.environ['LOCAL_UUID'] })
|
||||
except requests.exceptions.ConnectionError:
|
||||
continue
|
||||
|
||||
if response.status_code == 200:
|
||||
temp_dict[response.json()['uuid']] = { 'ip': ip }
|
||||
def main():
|
||||
"""
|
||||
This is the main method of the scheduler application.
|
||||
|
||||
This method does basically the followings:
|
||||
* Sets up logging
|
||||
* Creates the RedisSuperStorage object (connecting to Redis database)
|
||||
* Sets up communicators and IP watchdog
|
||||
* Performs an initial synchronization to all other consumers.
|
||||
* Starts the main loop which does roughly the following:
|
||||
* Syncrhronizes with all other consumers
|
||||
* Check if ip address changed. If yes, then push updates to all producers.
|
||||
* Wait `RUN_INTERVAL` seconds (provided by envvar)
|
||||
"""
|
||||
# set logging preferences
|
||||
logging.basicConfig(filename='', level=logging.DEBUG)
|
||||
|
||||
redis_storage = RedisSuperStorage(os.environ.get('REDIS_URL', "redis://localhost:6379/0"),
|
||||
int(os.environ.get("CUSTOMER_TIMEOUT", 30)))
|
||||
|
||||
consumer_communicator = ConsumerCommunicator(redis_storage, bool(os.environ.get('FORCE_IP_OVERRIDE', False)))
|
||||
producer_communicator = ProducerCommunicator(redis_storage)
|
||||
ip_watchdog = IPWatchdog(redis_storage)
|
||||
|
||||
logging.info("Syncing to initial consumer list")
|
||||
for ip in get_initial_ip_list():
|
||||
logging.debug(f"Syncing to {ip}")
|
||||
consumer_communicator.targeted_snyc(ip)
|
||||
|
||||
consumer_list_redis.update(temp_dict)
|
||||
r.set('consumer_list', json.dumps(consumer_list_redis).encode('utf-8'))
|
||||
logging.debug('Update redis consumers ip list from first answers: Done')
|
||||
while True:
|
||||
logging.debug('Infinite Cycle start : Done')
|
||||
# get the dictionary of the currently available consumers
|
||||
consumer_list_redis = json.loads((r.get('consumer_list') or b'{}').decode('utf-8'))
|
||||
logging.debug('Get consumer list from redis: Done')
|
||||
for uuid, info in consumer_list_redis.items():
|
||||
ip = info['ip']
|
||||
try:
|
||||
# request synchronization
|
||||
response = requests.post(f"http://{ip}/sync", json = { 'uuid': os.environ['LOCAL_UUID'] })
|
||||
except requests.exceptions.ConnectionError:
|
||||
continue
|
||||
logging.debug("Doing a sync")
|
||||
consumer_communicator.sync_all()
|
||||
|
||||
if response.status_code == 200:
|
||||
temp_dict[response.json()['uuid']] = { 'ip': ip }
|
||||
ip_changed, ipaddr = ip_watchdog.ip_changed()
|
||||
if ip_changed:
|
||||
producer_communicator.push_ip_update(ipaddr)
|
||||
|
||||
|
||||
# update the dictionary of the currently available consumers
|
||||
consumer_list_redis.update(temp_dict)
|
||||
r.set('consumer_list', json.dumps(consumer_list_redis).encode('utf-8'))
|
||||
logging.debug('Update redis consumer ip list from answers: Done')
|
||||
|
||||
logging.debug('Waiting for next turn')
|
||||
# wait for the next update time
|
||||
time.sleep(30)
|
||||
time.sleep(int(os.environ.get("RUN_INTERVAL", 10)))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
@@ -0,0 +1,169 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
This module contains the classes used for communicating with other consumers as well as the producers themselves.
|
||||
"""
|
||||
|
||||
import os
|
||||
import logging
|
||||
import requests
|
||||
import requests.exceptions
|
||||
from redis_super_storage import RedisSuperStorage
|
||||
|
||||
|
||||
class ProducerCommunicator:
|
||||
"""
|
||||
This class is used to communicate with producers.
|
||||
The addresses of producers are fetched from `RedisSuperStorage`.
|
||||
"""
|
||||
|
||||
def __init__(self, redis_store: RedisSuperStorage):
|
||||
"""
|
||||
Upon creating this object. A requests session is created on order to take advantage of keep-alive connections.
|
||||
|
||||
:param redis_store: A `RedisSuperStorage` instance.
|
||||
"""
|
||||
self._redis_store = redis_store
|
||||
self._session = requests.Session()
|
||||
|
||||
def push_ip_update(self, newip: str):
|
||||
"""
|
||||
This method is used to push an ip update to all known consumers.
|
||||
The list of consumers are read from the `RedisSuperStorage` instance.
|
||||
(The list of producers are maintained by the api endpoint.)
|
||||
|
||||
The uuid of this consumer is acquired directly from the `LOCAL_UUID` envvar.
|
||||
|
||||
A timeout of 5 seconds is hardcoded for each producer individually. Timeout is logged as warning.
|
||||
|
||||
Called URL::
|
||||
|
||||
http://<producer ip>/ip
|
||||
|
||||
Body::
|
||||
|
||||
{
|
||||
"uuid" : "str: LOCAL_UUID",
|
||||
"ip": "str: provided by param"
|
||||
}
|
||||
|
||||
|
||||
:param newip: New ipaddress to be annouced.
|
||||
"""
|
||||
|
||||
for key, ip in self._redis_store.get_producer_list().items():
|
||||
|
||||
try:
|
||||
response = self._session.post(
|
||||
f"http://{ip}/ip",
|
||||
json={
|
||||
'uuid': os.environ['LOCAL_UUID'],
|
||||
'ip': newip
|
||||
},
|
||||
timeout=5
|
||||
)
|
||||
logging.debug(f"Pushed update to {key} at {ip}. Response: {response.status_code}")
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||||
logging.warning(f"Could not push update to {key}: {str(e)}")
|
||||
|
||||
|
||||
class ConsumerCommunicator:
|
||||
"""
|
||||
This class is used to communicate with consumers.
|
||||
The addresses of consumers are fetched from the `RedisSuperStorage`.
|
||||
"""
|
||||
|
||||
def __init__(self, redis_store: RedisSuperStorage, force_ip_override: bool = False):
|
||||
"""
|
||||
Upon creating this object. A requests session is created on order to take advantage of keep-alive connections.
|
||||
|
||||
:param redis_store: A `RedisSuperStorage` instance.
|
||||
:param force_ip_override: Include the ip address stored in redis to the sync message (Disable the reciever ip discovery in the other consumer)
|
||||
"""
|
||||
self._redis_store = redis_store
|
||||
self._session = requests.Session()
|
||||
self._force_ip_override = force_ip_override
|
||||
|
||||
def targeted_snyc(self, ip: str):
|
||||
"""
|
||||
This method works similarly to `sync_all` however the target is not fetched from the `RedisSuperStorage` instance.
|
||||
The results are processed the same way (saved to redis).
|
||||
|
||||
A timeout of 5 seconds is hardcoded for this function. Timeout is logged as warning.
|
||||
|
||||
This method is preferred when the remote is unknown (uuid is unknown). Mostly when the application just started up,
|
||||
and an initial syncrhronization to all consumers is required.
|
||||
|
||||
See `sync_all` for more information.
|
||||
|
||||
Called URL::
|
||||
|
||||
http://<consumer ip>/sync
|
||||
|
||||
Body::
|
||||
|
||||
{
|
||||
"uuid" : "str: LOCAL_UUID",
|
||||
"ip" : "str: optional: IP override"
|
||||
}
|
||||
|
||||
:param ip: The ip address of the consumer to be synced to.
|
||||
"""
|
||||
|
||||
message = {'uuid': os.environ['LOCAL_UUID']}
|
||||
|
||||
if self._force_ip_override:
|
||||
message['ip'] = self._redis_store.current_ip
|
||||
|
||||
try:
|
||||
# request synchronization
|
||||
response = self._session.post(f"http://{ip}/sync", json=message, timeout=5)
|
||||
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||||
logging.warning(f"Error while syncing to {ip}: {str(e)}")
|
||||
return
|
||||
|
||||
if response.status_code == 200:
|
||||
self._redis_store.update_consumer(response.json()['uuid'], ip)
|
||||
|
||||
def sync_all(self):
|
||||
"""
|
||||
This method is used to syncronize with each known consumer.
|
||||
The list of consumers are acquired from the `RedisSuperStorage` instance.
|
||||
|
||||
This syncrhonization run causes the followings:
|
||||
* Each consumer known by this consumer is checked for availability
|
||||
* Each consumer this consumer communicated with updated the availability of this consumer.
|
||||
* Each consumer which had no information of this consumer, now have.
|
||||
|
||||
A timeout of 5 seconds is hardcoded for each consumer individually. Timeout is logged as warning.
|
||||
|
||||
Called URL::
|
||||
|
||||
http://<consumer ip>/sync
|
||||
|
||||
Body::
|
||||
|
||||
{
|
||||
"uuid" : "str: LOCAL_UUID",
|
||||
"ip" : "str: optional: IP override"
|
||||
}
|
||||
|
||||
"""
|
||||
|
||||
message = {'uuid': os.environ['LOCAL_UUID']}
|
||||
|
||||
if self._force_ip_override:
|
||||
message['ip'] = self._redis_store.current_ip
|
||||
|
||||
for uuid, info in self._redis_store.get_consumer_list().items():
|
||||
ip = info['ip']
|
||||
try:
|
||||
# request synchronization
|
||||
response = self._session.post(f"http://{ip}/sync", json=message, timeout=5)
|
||||
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
|
||||
logging.warning(f"Error while syncing to {ip}: {str(e)}")
|
||||
continue
|
||||
|
||||
if response.status_code == 200:
|
||||
self._redis_store.update_consumer(response.json()['uuid'], ip)
|
||||
+11
-6
@@ -10,9 +10,11 @@
|
||||
# add these directories to sys.path here. If the directory is relative to the
|
||||
# documentation root, use os.path.abspath to make it absolute, like shown here.
|
||||
#
|
||||
# import os
|
||||
# import sys
|
||||
# sys.path.insert(0, os.path.abspath('.'))
|
||||
import os
|
||||
import sys
|
||||
sys.path.insert(0, os.path.abspath('..'))
|
||||
|
||||
#import app
|
||||
|
||||
|
||||
# -- Project information -----------------------------------------------------
|
||||
@@ -30,8 +32,7 @@ release = '0.1.0'
|
||||
# Add any Sphinx extension module names here, as strings. They can be
|
||||
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
|
||||
# ones.
|
||||
extensions = [
|
||||
]
|
||||
extensions = ['sphinx.ext.autodoc']
|
||||
|
||||
# Add any paths that contain templates here, relative to this directory.
|
||||
templates_path = []
|
||||
@@ -52,4 +53,8 @@ html_theme = 'sphinx_rtd_theme'
|
||||
# Add any paths that contain custom static files (such as style sheets) here,
|
||||
# relative to this directory. They are copied after the builtin static files,
|
||||
# so a file named "default.css" will overwrite the builtin "default.css".
|
||||
html_static_path = []
|
||||
html_static_path = []
|
||||
|
||||
# master document
|
||||
master_doc = 'index'
|
||||
autoclass_content = 'both'
|
||||
@@ -0,0 +1,22 @@
|
||||
=============
|
||||
Configuration
|
||||
=============
|
||||
|
||||
This software component can be configured via environmental variables, which are very useful in a containerized environment.
|
||||
|
||||
|
||||
+---------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| Variable | Default | Description |
|
||||
+=====================+==========================+====================================================================================================================================================================================================+
|
||||
| `LOCAL_UUID` | **N/A** | The UUID of the consumer system. Must be the same as the API's |
|
||||
+---------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| `RUN_INTERVAL` | 10 | Interval between synchronizations, and local ip checks. |
|
||||
+---------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| `CUSTOMER_TIMEOUT` | 30 | Default timeout to be set for a consumer. If the timeout expires, the consumer will be considered invalid, and no further attempts will be made to contact with it. |
|
||||
+---------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| `REDIS_URL` | redis://localhost:6379/0 | URL of the Redis database shared with the consumer api |
|
||||
+---------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| `INITIAL_SERVERS` | **N/A** | A comma separated list of the initially known consumers (Can be empty, but must be specified) |
|
||||
+---------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| `FORCE_IP_OVERRIDE` | False | Include the ip address of the consumer into the sync message, causing the other consumer to save this ip address instead of the one it recieved the request from (Useful for weird netowrk setups) |
|
||||
+---------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
@@ -11,9 +11,11 @@ Welcome to Consumer Scheduler's documentation!
|
||||
:caption: Contents:
|
||||
|
||||
readme
|
||||
config
|
||||
source/modules
|
||||
|
||||
|
||||
|
||||
Indices and tables
|
||||
==================
|
||||
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
communicators module
|
||||
====================
|
||||
|
||||
.. automodule:: communicators
|
||||
:members:
|
||||
:undoc-members:
|
||||
:show-inheritance:
|
||||
@@ -0,0 +1,7 @@
|
||||
ip\_watchdog module
|
||||
===================
|
||||
|
||||
.. automodule:: ip_watchdog
|
||||
:members:
|
||||
:undoc-members:
|
||||
:show-inheritance:
|
||||
@@ -5,3 +5,6 @@ consumer-scheduler
|
||||
:maxdepth: 4
|
||||
|
||||
app
|
||||
communicators
|
||||
ip_watchdog
|
||||
redis_super_storage
|
||||
@@ -0,0 +1,7 @@
|
||||
redis\_super\_storage module
|
||||
============================
|
||||
|
||||
.. automodule:: redis_super_storage
|
||||
:members:
|
||||
:undoc-members:
|
||||
:show-inheritance:
|
||||
@@ -0,0 +1,50 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
This module contains the IPWatchdog class which is responsible for detecting ip address changes on the host machine.
|
||||
"""
|
||||
from typing import Tuple
|
||||
import logging
|
||||
import socket
|
||||
|
||||
from redis_super_storage import RedisSuperStorage
|
||||
|
||||
|
||||
class IPWatchdog:
|
||||
"""
|
||||
This is very simple class, that is used to determine if the ip address of the host have changed.
|
||||
|
||||
Internally this class relies on `RedisSuperStorage` to fetch the last used ip address.
|
||||
|
||||
The ip address of the current host is acquired using python's builtin `socket` interface, by requesting an address resolve agains the localhost's host name.
|
||||
In some scenarios this may result in wrongly identifying the loopback address instead of the assigned address.
|
||||
In most cases, where the application is run inside a docker container this method will work just fine.
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self, redis_store: RedisSuperStorage):
|
||||
"""
|
||||
During the construction of the object, the host name is of the current machine is cached, for quicker lookups.
|
||||
|
||||
:param redis_store: a RedisSuperStorage instance.
|
||||
"""
|
||||
self._redis_store = redis_store
|
||||
self._host_name = socket.gethostname()
|
||||
|
||||
def ip_changed(self) -> Tuple[bool, str]:
|
||||
"""
|
||||
This method fetches the last ip address from the RedisSuperStorage instance, and compares it to the current local address.
|
||||
If the ip address changes the new value is automatically stored in the RedisSuperStorage instance.
|
||||
|
||||
Detection is performed upon calling this method, as well as storing the updated address.
|
||||
|
||||
:return: (changed, ip_address) A tuple with two members, where the first member indicates if the ip address is changed, the second member is the current ip address.
|
||||
"""
|
||||
old_ip = self._redis_store.current_ip
|
||||
current_ip = socket.gethostbyname(self._host_name)
|
||||
|
||||
if current_ip != old_ip:
|
||||
logging.info(f'IP changed: {old_ip} -> {current_ip}')
|
||||
self._redis_store.current_ip = current_ip
|
||||
return True, current_ip
|
||||
|
||||
return False, old_ip
|
||||
@@ -0,0 +1,132 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
This module contains the RedisSuperStorage module which is responsible to store and load the data structure used by the constumer.
|
||||
"""
|
||||
import redis
|
||||
import os
|
||||
import json
|
||||
import time
|
||||
|
||||
|
||||
class RedisSuperStorage:
|
||||
"""
|
||||
This class is the brain of the scheduler app, and probably half the brain of the whole consumer system.
|
||||
|
||||
This class provides access to the data structures used in the consumer system trough various getter and setter methods.
|
||||
Kind of like an ORM would do.
|
||||
|
||||
Sometimes it is necessary to access more than one time to the redis database while qcquiring or setting some data.
|
||||
In this cases some basic error handling is implemented to avoid most of the problems that could be caused by non-atomic operations.
|
||||
"""
|
||||
|
||||
def __init__(self, redis_url: str, timeout: int):
|
||||
"""
|
||||
During the object creation the Redis connection is attempted to be established.
|
||||
|
||||
:param redis_url: URL of the redis database in the following form: `redis://localhost:6379/0`
|
||||
:param timeout: Timeout to be set on keys, where it matters. (See the documentation for each method)
|
||||
"""
|
||||
self.r = redis.from_url(redis_url)
|
||||
self._timeout = timeout
|
||||
|
||||
def get_consumer_list(self) -> dict:
|
||||
"""
|
||||
This function gets the list of consumers from the Redis database.
|
||||
|
||||
Consumers are saved to the database either by the API endpoint, or calling `update_consumer` of this class.
|
||||
The consumers are volatile, they have a timeout set when they updated.
|
||||
If they are not updated within that timeout they considered invalid, and does not returned by this method.
|
||||
|
||||
|
||||
Each consumer is represented by the following structure::
|
||||
|
||||
{
|
||||
"uuid" : "str",
|
||||
"ip" : "str",
|
||||
"last_seen": int
|
||||
}
|
||||
|
||||
Meaning of the fields:
|
||||
* `uuid` - The UUID of the remote consumer
|
||||
* `ip` - The IP address of the remote consumer
|
||||
* `last_seen` - The UNIX timestamp when the consumer was last seen.
|
||||
|
||||
:return: Despite it's name, this function returns a dictionary in which the keys are the uuids of each consumer.
|
||||
"""
|
||||
keys = self.r.keys('consumer_*')
|
||||
|
||||
list_of_customers = {}
|
||||
|
||||
for key in keys:
|
||||
info = json.loads((self.r.get(key) or b"{}").decode('utf-8'))
|
||||
|
||||
if info:
|
||||
list_of_customers[info['uuid']] = info
|
||||
|
||||
return list_of_customers
|
||||
|
||||
def get_producer_list(self) -> dict:
|
||||
"""
|
||||
This method returns a list of producer ip addresses, nothing more.
|
||||
The producer ip addresses are volatile, they have a timeout set when they updated.
|
||||
If they are not updated within that timeout they considered invalid, and does not returned by this method.
|
||||
|
||||
Producers are added to the redis database by the API endpoint with a timeout set on them.
|
||||
|
||||
:return: Despite it's name this function returns a dict... Similar to `get_consumer_list`. The keys are the keys stored in redis (lol)
|
||||
"""
|
||||
keys = self.r.keys('producer_*')
|
||||
|
||||
list_of_producer_ip = {}
|
||||
|
||||
for key in keys:
|
||||
ip = (self.r.get(key) or b"").decode('utf-8')
|
||||
|
||||
if ip:
|
||||
list_of_producer_ip[key.decode('utf-8')] = ip
|
||||
|
||||
return list_of_producer_ip
|
||||
|
||||
def update_consumer(self, uuid: str, ip: str):
|
||||
"""
|
||||
Updates (or creates) informations of a specific consumer in the redis database.
|
||||
The default timeout is set on the keys, when stored in the database.
|
||||
|
||||
:param uuid: The uuid of the consumer to be updated.
|
||||
:param ip: The ip address of that consumer.
|
||||
"""
|
||||
|
||||
cust_key = f"consumer_{uuid}"
|
||||
|
||||
info = {
|
||||
"uuid": uuid,
|
||||
"ip": ip,
|
||||
"last_seen": time.time()
|
||||
}
|
||||
|
||||
self.r.set(cust_key, json.dumps(info).encode('utf-8'))
|
||||
self.r.expire(cust_key, self._timeout)
|
||||
|
||||
def get_current_ip(self) -> str:
|
||||
"""
|
||||
This is a basic getter, which reads a single value from the Redis database.
|
||||
|
||||
:return: The ip address of the consumer stored in the redis database.
|
||||
"""
|
||||
ip = self.r.get('current_ip')
|
||||
|
||||
if ip:
|
||||
ip = ip.decode('utf-8')
|
||||
|
||||
return ip
|
||||
|
||||
def set_current_ip(self, ip: str):
|
||||
"""
|
||||
This is the most basic setter in the whole object. This is stores a single value which is the ip address of the consumer.
|
||||
|
||||
The current ip in the redis storage does not time out.
|
||||
:param ip: IP address to be set.
|
||||
"""
|
||||
self.r.set('current_ip', ip.encode('utf-8'))
|
||||
|
||||
current_ip = property(get_current_ip, set_current_ip)
|
||||
@@ -0,0 +1,6 @@
|
||||
pytest
|
||||
pytest-runner
|
||||
requests_mock
|
||||
pytest-mock
|
||||
mock
|
||||
coverage
|
||||
@@ -0,0 +1,514 @@
|
||||
from redis_super_storage import RedisSuperStorage
|
||||
from communicators import ConsumerCommunicator, ProducerCommunicator
|
||||
from ip_watchdog import IPWatchdog
|
||||
import pytest
|
||||
import redis
|
||||
import json
|
||||
import socket
|
||||
import requests
|
||||
import requests.exceptions
|
||||
import logging
|
||||
import os
|
||||
import app
|
||||
|
||||
REDIS_URL = "redis://localhost/0"
|
||||
REDIS_TIMEOUT = 2
|
||||
|
||||
CURRENT_HOSTNAME = "testenv.local"
|
||||
CURRENT_IPADDR = "192.168.1.50"
|
||||
|
||||
LOCAL_UUID = "testuuid1"
|
||||
os.environ["LOCAL_UUID"] = LOCAL_UUID
|
||||
|
||||
os.environ["INITIAL_SERVERS"] = "127.0.0.1,192.168.0.1,172.20.0.2"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def redis_super_storage_instance(mocker):
|
||||
mocker.patch("redis.from_url")
|
||||
yield RedisSuperStorage(REDIS_URL, REDIS_TIMEOUT)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ip_watchdog_instance(mocker, redis_super_storage_instance):
|
||||
mocker.patch("socket.gethostname", side_effect=lambda: CURRENT_HOSTNAME)
|
||||
mocker.patch("socket.gethostbyname", side_effect=lambda a: CURRENT_IPADDR)
|
||||
yield IPWatchdog(redis_super_storage_instance)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def consumer_communicator_instance(redis_super_storage_instance):
|
||||
yield ConsumerCommunicator(redis_super_storage_instance)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def producer_communicator_instance(redis_super_storage_instance):
|
||||
yield ProducerCommunicator(redis_super_storage_instance)
|
||||
|
||||
|
||||
# ========================================
|
||||
# RedisSuperStorage
|
||||
# ========================================
|
||||
|
||||
# __init__
|
||||
|
||||
def test_rst_instance_creation(mocker):
|
||||
mocker.patch("redis.from_url")
|
||||
rst = RedisSuperStorage("test", 2)
|
||||
|
||||
redis.from_url.assert_called_once_with("test")
|
||||
assert rst._timeout == 2
|
||||
|
||||
|
||||
# ip get/set
|
||||
|
||||
def test_rst_ip_getter_get_none(redis_super_storage_instance):
|
||||
redis_super_storage_instance.r.get.side_effect = lambda a: None
|
||||
|
||||
ip = redis_super_storage_instance.current_ip
|
||||
|
||||
assert ip is None
|
||||
redis_super_storage_instance.r.get.assert_called_once_with('current_ip')
|
||||
|
||||
|
||||
def test_rst_ip_getter_get_ip(redis_super_storage_instance):
|
||||
redis_super_storage_instance.r.get.side_effect = lambda a: b"127.0.0.1"
|
||||
|
||||
ip = redis_super_storage_instance.current_ip
|
||||
|
||||
assert ip == "127.0.0.1"
|
||||
redis_super_storage_instance.r.get.assert_called_once_with('current_ip')
|
||||
|
||||
|
||||
def test_rst_ip_getter_set_ip(redis_super_storage_instance):
|
||||
redis_super_storage_instance.current_ip = "127.0.0.1"
|
||||
redis_super_storage_instance.r.set.assert_called_once_with('current_ip', b"127.0.0.1")
|
||||
|
||||
|
||||
# update consumer
|
||||
|
||||
def test_rst_update_consumer(mocker, redis_super_storage_instance):
|
||||
mocker.patch("time.time", side_effect=lambda: 123)
|
||||
redis_super_storage_instance.update_consumer("testuuid", "127.0.0.1")
|
||||
|
||||
cust_key = "consumer_testuuid"
|
||||
|
||||
info = {
|
||||
"uuid": "testuuid",
|
||||
"ip": "127.0.0.1",
|
||||
"last_seen": 123
|
||||
}
|
||||
|
||||
redis_super_storage_instance.r.set.assert_called_once_with(cust_key, json.dumps(info).encode('utf-8'))
|
||||
redis_super_storage_instance.r.expire.assert_called_once_with(cust_key, REDIS_TIMEOUT)
|
||||
|
||||
|
||||
# producer list
|
||||
|
||||
def test_rst_get_producer_list(redis_super_storage_instance):
|
||||
redis_super_storage_instance.r.keys.side_effect = lambda a: [
|
||||
b"producer_uuid1",
|
||||
b"producer_uuid2",
|
||||
b"producer_uuid3"
|
||||
]
|
||||
|
||||
data = {
|
||||
b"producer_uuid1": b"127.0.0.1",
|
||||
b"producer_uuid2": b"127.0.0.2",
|
||||
b"producer_uuid3": b"127.0.0.3",
|
||||
}
|
||||
|
||||
redis_super_storage_instance.r.get.side_effect = lambda a: data[a]
|
||||
|
||||
lst = redis_super_storage_instance.get_producer_list()
|
||||
|
||||
assert lst == {
|
||||
"producer_uuid1": "127.0.0.1",
|
||||
"producer_uuid2": "127.0.0.2",
|
||||
"producer_uuid3": "127.0.0.3",
|
||||
}
|
||||
|
||||
redis_super_storage_instance.r.keys.assert_called_once_with("producer_*")
|
||||
|
||||
|
||||
def test_rst_get_producer_expire_while_get(redis_super_storage_instance):
|
||||
redis_super_storage_instance.r.keys.side_effect = lambda a: [b"producer_uuid1"]
|
||||
redis_super_storage_instance.r.get.side_effect = lambda a: None
|
||||
|
||||
lst = redis_super_storage_instance.get_producer_list()
|
||||
|
||||
assert isinstance(lst, dict)
|
||||
assert lst == {}
|
||||
|
||||
redis_super_storage_instance.r.keys.assert_called_once_with("producer_*")
|
||||
|
||||
|
||||
# get_consumer_list
|
||||
|
||||
def test_rst_get_consumer_list(redis_super_storage_instance):
|
||||
redis_super_storage_instance.r.keys.side_effect = lambda a: [
|
||||
b"consumer_uuid1",
|
||||
b"consumer_uuid2",
|
||||
b"consumer_uuid3"
|
||||
]
|
||||
|
||||
data = {
|
||||
b"consumer_uuid1": json.dumps({
|
||||
"uuid": "consumer_uuid1",
|
||||
"ip": "127.0.0.1",
|
||||
"last_seen": 123
|
||||
}).encode("utf-8"),
|
||||
b"consumer_uuid2": json.dumps({
|
||||
"uuid": "consumer_uuid2",
|
||||
"ip": "127.0.0.2",
|
||||
"last_seen": 1234
|
||||
}).encode("utf-8"),
|
||||
b"consumer_uuid3": json.dumps({
|
||||
"uuid": "consumer_uuid3",
|
||||
"ip": "127.0.0.3",
|
||||
"last_seen": 1235
|
||||
}).encode("utf-8")
|
||||
}
|
||||
|
||||
redis_super_storage_instance.r.get.side_effect = lambda a: data[a]
|
||||
|
||||
lst = redis_super_storage_instance.get_consumer_list()
|
||||
|
||||
assert lst == {
|
||||
"consumer_uuid1": {
|
||||
"uuid": "consumer_uuid1",
|
||||
"ip": "127.0.0.1",
|
||||
"last_seen": 123
|
||||
},
|
||||
"consumer_uuid2": {
|
||||
"uuid": "consumer_uuid2",
|
||||
"ip": "127.0.0.2",
|
||||
"last_seen": 1234
|
||||
},
|
||||
"consumer_uuid3": {
|
||||
"uuid": "consumer_uuid3",
|
||||
"ip": "127.0.0.3",
|
||||
"last_seen": 1235
|
||||
}
|
||||
}
|
||||
|
||||
redis_super_storage_instance.r.keys.assert_called_once_with("consumer_*")
|
||||
assert redis_super_storage_instance.r.get.call_count == 3
|
||||
|
||||
|
||||
def test_rst_get_consumer_list_expire_while_get(redis_super_storage_instance):
|
||||
redis_super_storage_instance.r.keys.side_effect = lambda a: [b"consumer_uuid1"]
|
||||
redis_super_storage_instance.r.get.side_effect = lambda a: None
|
||||
|
||||
lst = redis_super_storage_instance.get_consumer_list()
|
||||
|
||||
assert isinstance(lst, dict)
|
||||
assert lst == {}
|
||||
|
||||
redis_super_storage_instance.r.keys.assert_called_once_with("consumer_*")
|
||||
redis_super_storage_instance.r.get.assert_called_once_with(b"consumer_uuid1")
|
||||
|
||||
|
||||
# ========================================
|
||||
# IPWatchdog
|
||||
# ========================================
|
||||
|
||||
# __init__
|
||||
|
||||
def test_ipw_instantiate(mocker, redis_super_storage_instance):
|
||||
mocker.patch("socket.gethostname", side_effect=lambda: "test")
|
||||
ipw = IPWatchdog(redis_super_storage_instance)
|
||||
|
||||
assert ipw._host_name == "test"
|
||||
assert ipw._redis_store == redis_super_storage_instance
|
||||
socket.gethostname.assert_called_once()
|
||||
|
||||
|
||||
def test_ipw_is_changed_false(mocker, ip_watchdog_instance):
|
||||
ip_watchdog_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8")
|
||||
|
||||
changed, ip = ip_watchdog_instance.ip_changed()
|
||||
|
||||
assert not changed
|
||||
assert ip == CURRENT_IPADDR
|
||||
|
||||
ip_watchdog_instance._redis_store.r.get.assert_called_once_with("current_ip")
|
||||
assert not ip_watchdog_instance._redis_store.r.set.called
|
||||
socket.gethostbyname.assert_called_once_with(CURRENT_HOSTNAME)
|
||||
|
||||
|
||||
def test_ipw_is_changed_true(mocker, ip_watchdog_instance):
|
||||
mocker.patch("socket.gethostbyname", side_effect=lambda a: "192.168.2.123")
|
||||
ip_watchdog_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8")
|
||||
|
||||
changed, ip = ip_watchdog_instance.ip_changed()
|
||||
|
||||
assert changed
|
||||
assert ip == "192.168.2.123"
|
||||
|
||||
ip_watchdog_instance._redis_store.r.get.assert_called_once_with("current_ip")
|
||||
assert ip_watchdog_instance._redis_store.r.set.called_once_with("current_ip", b"192.168.2.123")
|
||||
socket.gethostbyname.assert_called_once_with(CURRENT_HOSTNAME)
|
||||
|
||||
|
||||
# ========================================
|
||||
# Communicators
|
||||
# ========================================
|
||||
|
||||
|
||||
def test_cc_instantiate(redis_super_storage_instance):
|
||||
cc = ConsumerCommunicator(redis_super_storage_instance)
|
||||
|
||||
assert cc._redis_store == redis_super_storage_instance
|
||||
assert isinstance(cc._session, requests.Session)
|
||||
|
||||
|
||||
def test_pc_instantiate(redis_super_storage_instance):
|
||||
pc = ProducerCommunicator(redis_super_storage_instance)
|
||||
|
||||
assert pc._redis_store == redis_super_storage_instance
|
||||
assert isinstance(pc._session, requests.Session)
|
||||
|
||||
|
||||
# producer communicator
|
||||
|
||||
def test_pc_push_ip_update(requests_mock, producer_communicator_instance):
|
||||
producer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
|
||||
b"producer_uuid1",
|
||||
b"producer_uuid2",
|
||||
b"producer_uuid3"
|
||||
]
|
||||
|
||||
data = {
|
||||
b"producer_uuid1": b"127.0.0.1",
|
||||
b"producer_uuid2": b"127.0.0.2",
|
||||
b"producer_uuid3": b"127.0.0.3",
|
||||
}
|
||||
|
||||
producer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
|
||||
|
||||
first = requests_mock.post("http://127.0.0.1/ip")
|
||||
second = requests_mock.post("http://127.0.0.2/ip")
|
||||
third = requests_mock.post("http://127.0.0.3/ip")
|
||||
|
||||
producer_communicator_instance.push_ip_update(CURRENT_IPADDR)
|
||||
|
||||
assert first.call_count == 1
|
||||
assert second.call_count == 1
|
||||
assert third.call_count == 1
|
||||
|
||||
assert first.last_request.json() == second.last_request.json() == third.last_request.json()
|
||||
|
||||
assert first.last_request.json()['ip'] == CURRENT_IPADDR
|
||||
assert first.last_request.json()['uuid'] == LOCAL_UUID
|
||||
|
||||
|
||||
def test_pc_push_ip_update_error_logged(mocker, requests_mock, producer_communicator_instance):
|
||||
mocker.patch("logging.warning")
|
||||
|
||||
producer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
|
||||
b"producer_uuid1",
|
||||
b"producer_uuid2",
|
||||
b"producer_uuid3"
|
||||
]
|
||||
|
||||
data = {
|
||||
b"producer_uuid1": b"127.0.0.1",
|
||||
b"producer_uuid2": b"127.0.0.2",
|
||||
b"producer_uuid3": b"127.0.0.3",
|
||||
}
|
||||
|
||||
producer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
|
||||
|
||||
first = requests_mock.post("http://127.0.0.1/ip")
|
||||
second = requests_mock.post("http://127.0.0.2/ip", exc=requests.exceptions.ConnectTimeout)
|
||||
third = requests_mock.post("http://127.0.0.3/ip")
|
||||
|
||||
producer_communicator_instance.push_ip_update(CURRENT_IPADDR)
|
||||
|
||||
assert first.call_count == 1
|
||||
assert second.call_count == 1
|
||||
assert third.call_count == 1
|
||||
|
||||
assert first.last_request.json() == third.last_request.json()
|
||||
|
||||
assert first.last_request.json()['ip'] == CURRENT_IPADDR
|
||||
assert first.last_request.json()['uuid'] == LOCAL_UUID
|
||||
|
||||
logging.warning.assert_called_once()
|
||||
|
||||
|
||||
# customer communicator
|
||||
|
||||
def test_cc_targeted_sync(requests_mock, consumer_communicator_instance):
|
||||
a = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "testasdasdasd"})
|
||||
|
||||
consumer_communicator_instance.targeted_snyc("127.0.0.2")
|
||||
|
||||
assert a.called
|
||||
assert a.last_request.json() == {'uuid': LOCAL_UUID}
|
||||
|
||||
|
||||
def test_cc_targeted_sync_ip_override(requests_mock, consumer_communicator_instance):
|
||||
consumer_communicator_instance._force_ip_override = True
|
||||
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8")
|
||||
a = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "testasdasdasd"})
|
||||
|
||||
consumer_communicator_instance.targeted_snyc("127.0.0.2")
|
||||
|
||||
assert a.called
|
||||
assert a.last_request.json() == {'uuid': LOCAL_UUID, "ip": CURRENT_IPADDR}
|
||||
|
||||
|
||||
def test_cc_targeted_sync_error_logged(mocker, requests_mock, consumer_communicator_instance):
|
||||
mocker.patch("logging.warning")
|
||||
|
||||
requests_mock.post("http://127.0.0.2/sync", exc=requests.exceptions.ConnectTimeout)
|
||||
|
||||
consumer_communicator_instance.targeted_snyc("127.0.0.2")
|
||||
|
||||
logging.warning.assert_called_once()
|
||||
|
||||
|
||||
def test_cc_sync_all(requests_mock, consumer_communicator_instance):
|
||||
consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
|
||||
b"consumer_uuid1",
|
||||
b"consumer_uuid2",
|
||||
b"consumer_uuid3"
|
||||
]
|
||||
|
||||
data = {
|
||||
b"consumer_uuid1": json.dumps({
|
||||
"uuid": "consumer_uuid1",
|
||||
"ip": "127.0.0.1",
|
||||
"last_seen": 123
|
||||
}).encode("utf-8"),
|
||||
b"consumer_uuid2": json.dumps({
|
||||
"uuid": "consumer_uuid2",
|
||||
"ip": "127.0.0.2",
|
||||
"last_seen": 1234
|
||||
}).encode("utf-8"),
|
||||
b"consumer_uuid3": json.dumps({
|
||||
"uuid": "consumer_uuid3",
|
||||
"ip": "127.0.0.3",
|
||||
"last_seen": 1235
|
||||
}).encode("utf-8")
|
||||
}
|
||||
|
||||
first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"})
|
||||
second = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "consumer_uuid2"})
|
||||
third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"})
|
||||
|
||||
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
|
||||
|
||||
consumer_communicator_instance.sync_all()
|
||||
|
||||
assert first.called
|
||||
assert second.called
|
||||
assert third.called
|
||||
|
||||
assert first.last_request.json() == second.last_request.json() == third.last_request.json()
|
||||
|
||||
assert first.last_request.json()['uuid'] == LOCAL_UUID
|
||||
|
||||
|
||||
def test_cc_sync_all_ip_override(requests_mock, consumer_communicator_instance):
|
||||
consumer_communicator_instance._force_ip_override = True
|
||||
consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
|
||||
b"consumer_uuid1",
|
||||
b"consumer_uuid2",
|
||||
b"consumer_uuid3"
|
||||
]
|
||||
|
||||
data = {
|
||||
b"consumer_uuid1": json.dumps({
|
||||
"uuid": "consumer_uuid1",
|
||||
"ip": "127.0.0.1",
|
||||
"last_seen": 123
|
||||
}).encode("utf-8"),
|
||||
b"consumer_uuid2": json.dumps({
|
||||
"uuid": "consumer_uuid2",
|
||||
"ip": "127.0.0.2",
|
||||
"last_seen": 1234
|
||||
}).encode("utf-8"),
|
||||
b"consumer_uuid3": json.dumps({
|
||||
"uuid": "consumer_uuid3",
|
||||
"ip": "127.0.0.3",
|
||||
"last_seen": 1235
|
||||
}).encode("utf-8"),
|
||||
"current_ip": CURRENT_IPADDR.encode("utf-8")
|
||||
}
|
||||
|
||||
first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"})
|
||||
second = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "consumer_uuid2"})
|
||||
third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"})
|
||||
|
||||
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
|
||||
|
||||
consumer_communicator_instance.sync_all()
|
||||
|
||||
assert first.called
|
||||
assert second.called
|
||||
assert third.called
|
||||
|
||||
assert first.last_request.json() == second.last_request.json() == third.last_request.json()
|
||||
|
||||
assert first.last_request.json()['uuid'] == LOCAL_UUID
|
||||
assert first.last_request.json()['ip'] == CURRENT_IPADDR
|
||||
|
||||
|
||||
def test_cc_sync_all_error_logged(mocker, requests_mock, consumer_communicator_instance):
|
||||
mocker.patch("logging.warning")
|
||||
|
||||
consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
|
||||
b"consumer_uuid1",
|
||||
b"consumer_uuid2",
|
||||
b"consumer_uuid3"
|
||||
]
|
||||
|
||||
data = {
|
||||
b"consumer_uuid1": json.dumps({
|
||||
"uuid": "consumer_uuid1",
|
||||
"ip": "127.0.0.1",
|
||||
"last_seen": 123
|
||||
}).encode("utf-8"),
|
||||
b"consumer_uuid2": json.dumps({
|
||||
"uuid": "consumer_uuid2",
|
||||
"ip": "127.0.0.2",
|
||||
"last_seen": 1234
|
||||
}).encode("utf-8"),
|
||||
b"consumer_uuid3": json.dumps({
|
||||
"uuid": "consumer_uuid3",
|
||||
"ip": "127.0.0.3",
|
||||
"last_seen": 1235
|
||||
}).encode("utf-8")
|
||||
}
|
||||
|
||||
first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"})
|
||||
second = requests_mock.post("http://127.0.0.2/sync", exc=requests.exceptions.ConnectTimeout)
|
||||
third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"})
|
||||
|
||||
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
|
||||
|
||||
consumer_communicator_instance.sync_all()
|
||||
|
||||
assert first.called
|
||||
assert second.called
|
||||
assert third.called
|
||||
|
||||
assert first.last_request.json() == third.last_request.json()
|
||||
|
||||
assert first.last_request.json()['uuid'] == LOCAL_UUID
|
||||
|
||||
logging.warning.assert_called_once()
|
||||
|
||||
|
||||
# ========================================
|
||||
# App
|
||||
# ========================================
|
||||
|
||||
|
||||
def test_app_get_initial_ip_list():
|
||||
lst = app.get_initial_ip_list()
|
||||
|
||||
assert lst == ["127.0.0.1", "192.168.0.1", "172.20.0.2"]
|
||||
Reference in New Issue
Block a user