48 Commits

Author SHA1 Message Date
marcsello ae822d87d8 Merge branch 'master' of kmlabz:GoldenPogacsa/consumer-scheduler
continuous-integration/drone/push Build is passing
2020-05-14 22:26:26 +02:00
marcsello 16bdd3c22c Updated documentation 2020-05-14 22:26:14 +02:00
marcsello f01001d331 Added force ip override setting 2020-05-14 22:25:47 +02:00
ricsik52 3b5c36048d instal dev requirements
continuous-integration/drone/push Build is passing
2020-05-14 21:08:09 +02:00
ricsik52 3f580c7dfc test and coverage
continuous-integration/drone/push Build is failing
2020-05-14 21:01:28 +02:00
marcsello 3cefcabad2 Merge branch 'dev'
continuous-integration/drone/push Build is passing
2020-05-14 20:48:57 +02:00
marcsello 13842daf1c Updated docs
continuous-integration/drone/push Build is passing
2020-05-14 20:47:52 +02:00
marcsello 5d2bed0f14 Added docstrings for communicators module. 2020-05-14 20:01:04 +02:00
marcsello 1a486833bb Documented rst
continuous-integration/drone/push Build is passing
2020-05-14 19:24:20 +02:00
marcsello ac90ca829f Added some docstrings 2020-05-14 18:43:05 +02:00
marcsello 108db57271 Added docstrings to ipw 2020-05-14 18:26:37 +02:00
marcsello 04e189f541 Fixed sphinx config
continuous-integration/drone/push Build is passing
2020-05-14 18:11:56 +02:00
dscharnitzky f267c49715 Update 'README.rst'
continuous-integration/drone/push Build is passing
2020-05-14 13:46:03 +02:00
ffabi1997 c1904dee21 Merge pull request 'big work' (#2) from dev into master
continuous-integration/drone/push Build is passing
2020-05-13 17:23:37 +02:00
marcsello 879b241060 changed severity of cc failures
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2020-05-13 17:20:58 +02:00
marcsello 06279d1346 Fixed cc not using session
continuous-integration/drone/push Build is passing
2020-05-13 17:19:08 +02:00
ricsik52 2a52501ff3 added coveragerc
continuous-integration/drone/push Build is passing
2020-05-13 17:17:32 +02:00
ricsik52 547499d840 Added test of initial_servers
continuous-integration/drone/push Build is passing
2020-05-13 17:15:55 +02:00
ricsik52 c46dbd8211 Added communicator tests
continuous-integration/drone/push Build is passing
2020-05-13 17:12:36 +02:00
ricsik52 c1cbe13b6f added tests for iwd
continuous-integration/drone/push Build is passing
2020-05-13 16:14:42 +02:00
ricsik52 d6e2beb294 Added tests for rst
continuous-integration/drone/push Build is passing
2020-05-13 16:06:39 +02:00
kovacs.bence.janos f90bc35309 use drone cache
continuous-integration/drone/push Build is passing
2020-05-08 23:21:57 +02:00
ffabi1997 0bea194f4a Fixed timeout not being used
continuous-integration/drone/push Build was killed
2020-05-08 23:00:37 +02:00
ffabi1997 1158e6cc60 added test skeleton 2020-05-08 23:00:23 +02:00
ffabi1997 b7fe49037f Added developement requirements
continuous-integration/drone/push Build is passing
2020-05-08 22:38:09 +02:00
ffabi1997 850598d519 small fixes
continuous-integration/drone/push Build is passing
2020-05-08 22:32:18 +02:00
ffabi1997 0713cabc6b Implemented ip change checking
continuous-integration/drone/push Build is passing
2020-05-08 22:21:15 +02:00
ffabi1997 7ce88f1a74 updated app.py 2020-05-08 22:05:49 +02:00
ffabi1997 77e915c63e Added communicators 2020-05-08 21:48:32 +02:00
ffabi1997 8668550a3f moved stuff around 2020-05-08 21:29:08 +02:00
ffabi1997 d8bdd717c7 Fixed up Redis super storage 2020-05-08 21:24:37 +02:00
ffabi1997 dbeb35785e moved files around 2020-05-08 20:47:02 +02:00
ricsik52 d84564ff95 Add 'javitas.py'
continuous-integration/drone/push Build is passing
2020-05-08 20:25:22 +02:00
tormakris 59ae404a19 use custom dind to build doc container image
continuous-integration/drone/push Build is passing
2020-04-28 20:54:00 +02:00
tormakris 989b646d45 dockerfile keyword is still needed
continuous-integration/drone/push Build is failing
2020-04-28 20:31:21 +02:00
tormakris fe7fcb3714 change context of document building
continuous-integration/drone/push Build was killed
2020-04-28 20:27:31 +02:00
tormakris 2d326d79f4 change directory before kaniko step
continuous-integration/drone/push Build is failing
2020-04-28 20:14:59 +02:00
tormakris 6825efca87 use kaniko to build container images
continuous-integration/drone/push Build is failing
2020-04-28 19:36:12 +02:00
marcsello 7702cb2c7e fixed confusing logging
continuous-integration/drone/push Build is passing
2020-04-22 05:01:17 +02:00
marcsello d32db92bab fixed ip change detection
continuous-integration/drone/push Build is passing
2020-04-22 04:49:45 +02:00
marcsello 5f2894e727 IP is now saved to redis
continuous-integration/drone/push Build is passing
2020-04-22 01:35:29 +02:00
marcsello fdac9446af Fixed redis not using config variable 2020-04-22 01:20:24 +02:00
marcsello d0d6267c74 Fixed stuff
continuous-integration/drone/push Build is passing
2020-04-17 16:51:27 +02:00
ricsik52 aed27cba95 Update 'app.py'
continuous-integration/drone/push Build is passing
2020-04-17 16:36:53 +02:00
ricsik52 9d94dbd6d6 Update 'app.py'
continuous-integration/drone/push Build is passing
2020-04-17 16:27:13 +02:00
ricsik52 2a77c552bf Update 'app.py'
continuous-integration/drone/push Build is passing
2020-04-17 16:25:51 +02:00
ricsik52 a1ca3285ab Update 'app.py'
continuous-integration/drone/push Build is passing
2020-04-17 16:25:00 +02:00
marcsello a60ee01ae1 Merge pull request 'documentation' (#1) from documentation into master
continuous-integration/drone/push Build is passing
2020-04-17 15:35:03 +02:00
16 changed files with 1126 additions and 82 deletions
+2
View File
@@ -0,0 +1,2 @@
[run]
omit=venv/*
+67 -20
View File
@@ -3,32 +3,61 @@ type: docker
name: default name: default
steps: steps:
- name: restore-cache-with-filesystem
image: meltwater/drone-cache
settings:
backend: "filesystem"
restore: true
cache_key: "{{ .Repo.Name }}"
archive_format: "gzip"
filesystem_cache_root: "/tmp/cache"
mount:
- '.pipcache'
volumes:
- name: cache
path: /tmp/cache
- name: static_analysis - name: static_analysis
image: python:3 image: python:3
commands: commands:
- pip3 install pylint bandit mccabe - pip3 install pylint bandit mccabe
- pip3 install -r requirements.txt - pip3 install --cache-dir='./.pipcache' -r requirements.txt
- find . -name "*.py" -exec python3 -m py_compile '{}' \; - find . -name "*.py" -exec python3 -m py_compile '{}' \;
- find . -name "*.py" -exec pylint '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi - find . -name "*.py" -exec pylint '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi
- find . -name "*.py" -exec python3 -m mccabe --min 3 '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi - find . -name "*.py" -exec python3 -m mccabe --min 3 '{}' + || if [ $? -eq 1 ]; then echo "you fail"; fi
- bandit -r . + || if [ $? -eq 1 ]; then echo "you fail"; fi - bandit -r . + || if [ $? -eq 1 ]; then echo "you fail"; fi
- name: build - name: unit_test
image: docker:stable-dind image: python:3.8
volumes:
- name: dockersock
path: /var/run
environment: environment:
DOCKER_USERNAME: PRODUCER_REDIS: cache
from_secret: DOCKER_USERNAME
DOCKER_PASSWORD:
from_secret: DOCKER_PASSWORD
commands: commands:
- echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin - pip3 install --cache-dir='./.pipcache' -r requirements.txt
- docker build -t="$DOCKER_USERNAME/consumer-scheduler" . - pip3 install --cache-dir='./.pipcache' -r requirements_dev.txt
- docker build -t="$DOCKER_USERNAME/consumer-scheduler:$DRONE_BUILD_NUMBER" . - pytest
- docker push "$DOCKER_USERNAME/consumer-scheduler"
- docker push "$DOCKER_USERNAME/consumer-scheduler:$DRONE_BUILD_NUMBER" - name: coverage
image: python:3.8
environment:
PRODUCER_REDIS: cache
commands:
- pip3 install --cache-dir='./.pipcache' -r requirements.txt
- pip3 install --cache-dir='./.pipcache' -r requirements_dev.txt
- coverage run -m pytest
- coverage report -m
- name: build-app
image: banzaicloud/drone-kaniko
settings:
registry: registry.kmlabz.com
repo: goldenpogacsa/${DRONE_REPO_NAME}
username:
from_secret: DOCKER_USERNAME
password:
from_secret: DOCKER_PASSWORD
tags:
- latest
- ${DRONE_BUILD_NUMBER}
- name: make_docs - name: make_docs
image: python:3.8 image: python:3.8
@@ -38,6 +67,21 @@ steps:
- cd docs - cd docs
- make html - make html
- name: rebuild-cache-with-filesystem
image: meltwater/drone-cache
pull: true
settings:
backend: "filesystem"
rebuild: true
cache_key: "{{ .Repo.Name }}"
archive_format: "gzip"
filesystem_cache_root: "/tmp/cache"
mount:
- '.pipcache'
volumes:
- name: cache
path: /tmp/cache
- name: build_docs - name: build_docs
image: docker:stable-dind image: docker:stable-dind
volumes: volumes:
@@ -50,11 +94,11 @@ steps:
from_secret: DOCKER_PASSWORD from_secret: DOCKER_PASSWORD
commands: commands:
- cd docs - cd docs
- echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin - echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin registry.kmlabz.com
- docker build -t="$DOCKER_USERNAME/consumer-scheduler-docs" . - docker build -t="registry.kmlabz.com/goldenpogacsa/consumer-scheduler-docs" .
- docker build -t="$DOCKER_USERNAME/consumer-scheduler-docs:$DRONE_BUILD_NUMBER" . - docker build -t="registry.kmlabz.com/goldenpogacsa/consumer-scheduler-docs:$DRONE_BUILD_NUMBER" .
- docker push "$DOCKER_USERNAME/consumer-scheduler-docs" - docker push "registry.kmlabz.com/goldenpogacsa/consumer-scheduler-docs"
- docker push "$DOCKER_USERNAME/consumer-scheduler-docs:$DRONE_BUILD_NUMBER" - docker push "registry.kmlabz.com/goldenpogacsa/consumer-scheduler-docs:$DRONE_BUILD_NUMBER"
- name: slack - name: slack
image: plugins/slack image: plugins/slack
@@ -77,3 +121,6 @@ services:
volumes: volumes:
- name: dockersock - name: dockersock
temp: {} temp: {}
- name: cache
host:
path: "/tmp/cache"
+76 -2
View File
@@ -2,6 +2,80 @@
P2P Consumer Scheduler P2P Consumer Scheduler
====================== ======================
Repository for the consumer's scheduler The scheduler part of a consumer system for the P2P storage system.
Produced by GoldenPogácsa Inc.
Basics
------
This is a component of a fully working consumer system.
In order to setup a consumer you will need the consumer API as well and a Redis database.
This component faciliates the automatic synchronization between consumers.
As well as detecting IP changes of the host which is running on.
Part of a system
----------------
This component works as a complimentary part for the consumer API.
The information exchange between the API is solved by using a Common Redis database.
This database must be shared between the API and the Scheduler instance.
Also the UUID is shared between the two components. This is obvious since they are essentially both parts of the same system.
Communications
--------------
The communication between other consumers and producers are solved by their REST API endpoints.
The follwing events will cause communication by this object:
* A sychronization task
* An IP address change detected
Since the communication between consumers happens regularly the IP address change event does not cause an immediate synchronization
However communication originated by scheduler toward producers only happens when the IP address change is detected.
Synchronization
~~~~~~~~~~~~~~~
This call originated by the scheduler and happens regularly (See. configuration).
A synchronization task causes the followings:
* Each consumer known by this consumer is checked for availability
* Each consumer this consumer communicated with updated the availability of this consumer.
* Each consumer which had no information of this consumer, now have.
Consumers learn about each other, and their IP addresses (and changes) during a synchronization.
Called URL::
http://<consumer ip>/sync
Body::
{
"uuid" : "str: LOCAL_UUID",
"ip" : "str: optional: IP override"
}
IP update
~~~~~~~~~
This communication is originated by the scheduler when an IP address change is detected.
This call is used to let producers immanently know about an IP change of their consumer, so that they can converge faster.
Called URL::
http://<producer ip>/ip
Body::
{
"uuid" : "str: LOCAL_UUID",
"ip": "str: provided by param"
}
Functionality: updates the database of the available consumers periodically
+49 -52
View File
@@ -1,16 +1,15 @@
#!/usr/bin/env python #!/usr/bin/env python3
"""
This is the top level module, containing the main application. Launching this file will launch the scheduler part of the consumer application.
"""
import sentry_sdk import sentry_sdk
import time import time
import requests
import requests.exceptions
import os import os
import redis
import json
import logging import logging
""" from redis_super_storage import RedisSuperStorage
Scheduler from communicators import ConsumerCommunicator, ProducerCommunicator
""" from ip_watchdog import IPWatchdog
__author__ = "@kocsisr" __author__ = "@kocsisr"
__copyright__ = "Copyright 2020, GoldenPogácsa Team" __copyright__ = "Copyright 2020, GoldenPogácsa Team"
@@ -20,64 +19,62 @@ __version__text__ = "1"
sentry_sdk.init("https://0a106e104e114bc9a3fa47f9cb0db2f4@sentry.kmlabz.com/10") sentry_sdk.init("https://0a106e104e114bc9a3fa47f9cb0db2f4@sentry.kmlabz.com/10")
def main(): def get_initial_ip_list() -> list:
"""
This method is used to parse the content of INITIAL_SERVERS environment variable.
The contents of this variable is a list of ip addresses separated by commas.
This mehod returns a Python native `list` object containing the addreseses provided.
:return: A list of ip addresses provided by the environmental variable
"""
ip_list = os.environ['INITIAL_SERVERS'].split(',')
logging.debug('Initial ip list ' + ", ".join(ip_list))
return ip_list
def main():
"""
This is the main method of the scheduler application.
This method does basically the followings:
* Sets up logging
* Creates the RedisSuperStorage object (connecting to Redis database)
* Sets up communicators and IP watchdog
* Performs an initial synchronization to all other consumers.
* Starts the main loop which does roughly the following:
* Syncrhronizes with all other consumers
* Check if ip address changed. If yes, then push updates to all producers.
* Wait `RUN_INTERVAL` seconds (provided by envvar)
"""
# set logging preferences # set logging preferences
logging.basicConfig(filename='', level=logging.DEBUG) logging.basicConfig(filename='', level=logging.DEBUG)
# connect to redis redis_storage = RedisSuperStorage(os.environ.get('REDIS_URL', "redis://localhost:6379/0"),
r = redis.Redis(host = 'localhost', port = 6379, db = 0) int(os.environ.get("CUSTOMER_TIMEOUT", 30)))
# set initial consumer addresses consumer_communicator = ConsumerCommunicator(redis_storage, bool(os.environ.get('FORCE_IP_OVERRIDE', False)))
ip_list = os.environ['INITIAL_SERVERS'].split(',') producer_communicator = ProducerCommunicator(redis_storage)
logging.debug('Get consumer list from environ at first: Done') ip_watchdog = IPWatchdog(redis_storage)
# get the dictionary of the currently available consumers
consumer_list_redis = json.loads((r.get('consumer_list') or b'{}').decode('utf-8'))
logging.debug('Get consumer list from redis at first: Done')
temp_dict = { }
for ip in ip_list: logging.info("Syncing to initial consumer list")
try: for ip in get_initial_ip_list():
# request synchronization logging.debug(f"Syncing to {ip}")
response = requests.post(f"http://{ip}/sync", json = { 'uuid': os.environ['LOCAL_UUID'] }) consumer_communicator.targeted_snyc(ip)
except requests.exceptions.ConnectionError:
continue
if response.status_code == 200:
temp_dict[response.json()['uuid']] = { 'ip': ip }
consumer_list_redis.update(temp_dict)
r.set('consumer_list', json.dumps(consumer_list_redis).encode('utf-8'))
logging.debug('Update redis consumers ip list from first answers: Done')
while True: while True:
logging.debug('Infinite Cycle start : Done') logging.debug("Doing a sync")
# get the dictionary of the currently available consumers consumer_communicator.sync_all()
consumer_list_redis = json.loads((r.get('consumer_list') or b'{}').decode('utf-8'))
logging.debug('Get consumer list from redis: Done')
for uuid, info in consumer_list_redis.items():
ip = info['ip']
try:
# request synchronization
response = requests.post(f"http://{ip}/sync", json = { 'uuid': os.environ['LOCAL_UUID'] })
except requests.exceptions.ConnectionError:
continue
if response.status_code == 200: ip_changed, ipaddr = ip_watchdog.ip_changed()
temp_dict[response.json()['uuid']] = { 'ip': ip } if ip_changed:
producer_communicator.push_ip_update(ipaddr)
time.sleep(int(os.environ.get("RUN_INTERVAL", 10)))
# update the dictionary of the currently available consumers
consumer_list_redis.update(temp_dict)
r.set('consumer_list', json.dumps(consumer_list_redis).encode('utf-8'))
logging.debug('Update redis consumer ip list from answers: Done')
logging.debug('Waiting for next turn')
# wait for the next update time
time.sleep(30)
if __name__ == "__main__": if __name__ == "__main__":
try: try:
main() main()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
+169
View File
@@ -0,0 +1,169 @@
#!/usr/bin/env python3
"""
This module contains the classes used for communicating with other consumers as well as the producers themselves.
"""
import os
import logging
import requests
import requests.exceptions
from redis_super_storage import RedisSuperStorage
class ProducerCommunicator:
"""
This class is used to communicate with producers.
The addresses of producers are fetched from `RedisSuperStorage`.
"""
def __init__(self, redis_store: RedisSuperStorage):
"""
Upon creating this object. A requests session is created on order to take advantage of keep-alive connections.
:param redis_store: A `RedisSuperStorage` instance.
"""
self._redis_store = redis_store
self._session = requests.Session()
def push_ip_update(self, newip: str):
"""
This method is used to push an ip update to all known consumers.
The list of consumers are read from the `RedisSuperStorage` instance.
(The list of producers are maintained by the api endpoint.)
The uuid of this consumer is acquired directly from the `LOCAL_UUID` envvar.
A timeout of 5 seconds is hardcoded for each producer individually. Timeout is logged as warning.
Called URL::
http://<producer ip>/ip
Body::
{
"uuid" : "str: LOCAL_UUID",
"ip": "str: provided by param"
}
:param newip: New ipaddress to be annouced.
"""
for key, ip in self._redis_store.get_producer_list().items():
try:
response = self._session.post(
f"http://{ip}/ip",
json={
'uuid': os.environ['LOCAL_UUID'],
'ip': newip
},
timeout=5
)
logging.debug(f"Pushed update to {key} at {ip}. Response: {response.status_code}")
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
logging.warning(f"Could not push update to {key}: {str(e)}")
class ConsumerCommunicator:
"""
This class is used to communicate with consumers.
The addresses of consumers are fetched from the `RedisSuperStorage`.
"""
def __init__(self, redis_store: RedisSuperStorage, force_ip_override: bool = False):
"""
Upon creating this object. A requests session is created on order to take advantage of keep-alive connections.
:param redis_store: A `RedisSuperStorage` instance.
:param force_ip_override: Include the ip address stored in redis to the sync message (Disable the reciever ip discovery in the other consumer)
"""
self._redis_store = redis_store
self._session = requests.Session()
self._force_ip_override = force_ip_override
def targeted_snyc(self, ip: str):
"""
This method works similarly to `sync_all` however the target is not fetched from the `RedisSuperStorage` instance.
The results are processed the same way (saved to redis).
A timeout of 5 seconds is hardcoded for this function. Timeout is logged as warning.
This method is preferred when the remote is unknown (uuid is unknown). Mostly when the application just started up,
and an initial syncrhronization to all consumers is required.
See `sync_all` for more information.
Called URL::
http://<consumer ip>/sync
Body::
{
"uuid" : "str: LOCAL_UUID",
"ip" : "str: optional: IP override"
}
:param ip: The ip address of the consumer to be synced to.
"""
message = {'uuid': os.environ['LOCAL_UUID']}
if self._force_ip_override:
message['ip'] = self._redis_store.current_ip
try:
# request synchronization
response = self._session.post(f"http://{ip}/sync", json=message, timeout=5)
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
logging.warning(f"Error while syncing to {ip}: {str(e)}")
return
if response.status_code == 200:
self._redis_store.update_consumer(response.json()['uuid'], ip)
def sync_all(self):
"""
This method is used to syncronize with each known consumer.
The list of consumers are acquired from the `RedisSuperStorage` instance.
This syncrhonization run causes the followings:
* Each consumer known by this consumer is checked for availability
* Each consumer this consumer communicated with updated the availability of this consumer.
* Each consumer which had no information of this consumer, now have.
A timeout of 5 seconds is hardcoded for each consumer individually. Timeout is logged as warning.
Called URL::
http://<consumer ip>/sync
Body::
{
"uuid" : "str: LOCAL_UUID",
"ip" : "str: optional: IP override"
}
"""
message = {'uuid': os.environ['LOCAL_UUID']}
if self._force_ip_override:
message['ip'] = self._redis_store.current_ip
for uuid, info in self._redis_store.get_consumer_list().items():
ip = info['ip']
try:
# request synchronization
response = self._session.post(f"http://{ip}/sync", json=message, timeout=5)
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
logging.warning(f"Error while syncing to {ip}: {str(e)}")
continue
if response.status_code == 200:
self._redis_store.update_consumer(response.json()['uuid'], ip)
+10 -5
View File
@@ -10,9 +10,11 @@
# add these directories to sys.path here. If the directory is relative to the # add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here. # documentation root, use os.path.abspath to make it absolute, like shown here.
# #
# import os import os
# import sys import sys
# sys.path.insert(0, os.path.abspath('.')) sys.path.insert(0, os.path.abspath('..'))
#import app
# -- Project information ----------------------------------------------------- # -- Project information -----------------------------------------------------
@@ -30,8 +32,7 @@ release = '0.1.0'
# Add any Sphinx extension module names here, as strings. They can be # Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones. # ones.
extensions = [ extensions = ['sphinx.ext.autodoc']
]
# Add any paths that contain templates here, relative to this directory. # Add any paths that contain templates here, relative to this directory.
templates_path = [] templates_path = []
@@ -53,3 +54,7 @@ html_theme = 'sphinx_rtd_theme'
# relative to this directory. They are copied after the builtin static files, # relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css". # so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = [] html_static_path = []
# master document
master_doc = 'index'
autoclass_content = 'both'
+22
View File
@@ -0,0 +1,22 @@
=============
Configuration
=============
This software component can be configured via environmental variables, which are very useful in a containerized environment.
+---------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Variable | Default | Description |
+=====================+==========================+====================================================================================================================================================================================================+
| `LOCAL_UUID` | **N/A** | The UUID of the consumer system. Must be the same as the API's |
+---------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| `RUN_INTERVAL` | 10 | Interval between synchronizations, and local ip checks. |
+---------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| `CUSTOMER_TIMEOUT` | 30 | Default timeout to be set for a consumer. If the timeout expires, the consumer will be considered invalid, and no further attempts will be made to contact with it. |
+---------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| `REDIS_URL` | redis://localhost:6379/0 | URL of the Redis database shared with the consumer api |
+---------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| `INITIAL_SERVERS` | **N/A** | A comma separated list of the initially known consumers (Can be empty, but must be specified) |
+---------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| `FORCE_IP_OVERRIDE` | False | Include the ip address of the consumer into the sync message, causing the other consumer to save this ip address instead of the one it recieved the request from (Useful for weird netowrk setups) |
+---------------------+--------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+2
View File
@@ -11,9 +11,11 @@ Welcome to Consumer Scheduler's documentation!
:caption: Contents: :caption: Contents:
readme readme
config
source/modules source/modules
Indices and tables Indices and tables
================== ==================
+7
View File
@@ -0,0 +1,7 @@
communicators module
====================
.. automodule:: communicators
:members:
:undoc-members:
:show-inheritance:
+7
View File
@@ -0,0 +1,7 @@
ip\_watchdog module
===================
.. automodule:: ip_watchdog
:members:
:undoc-members:
:show-inheritance:
+3
View File
@@ -5,3 +5,6 @@ consumer-scheduler
:maxdepth: 4 :maxdepth: 4
app app
communicators
ip_watchdog
redis_super_storage
+7
View File
@@ -0,0 +1,7 @@
redis\_super\_storage module
============================
.. automodule:: redis_super_storage
:members:
:undoc-members:
:show-inheritance:
+50
View File
@@ -0,0 +1,50 @@
#!/usr/bin/env python3
"""
This module contains the IPWatchdog class which is responsible for detecting ip address changes on the host machine.
"""
from typing import Tuple
import logging
import socket
from redis_super_storage import RedisSuperStorage
class IPWatchdog:
"""
This is very simple class, that is used to determine if the ip address of the host have changed.
Internally this class relies on `RedisSuperStorage` to fetch the last used ip address.
The ip address of the current host is acquired using python's builtin `socket` interface, by requesting an address resolve agains the localhost's host name.
In some scenarios this may result in wrongly identifying the loopback address instead of the assigned address.
In most cases, where the application is run inside a docker container this method will work just fine.
"""
def __init__(self, redis_store: RedisSuperStorage):
"""
During the construction of the object, the host name is of the current machine is cached, for quicker lookups.
:param redis_store: a RedisSuperStorage instance.
"""
self._redis_store = redis_store
self._host_name = socket.gethostname()
def ip_changed(self) -> Tuple[bool, str]:
"""
This method fetches the last ip address from the RedisSuperStorage instance, and compares it to the current local address.
If the ip address changes the new value is automatically stored in the RedisSuperStorage instance.
Detection is performed upon calling this method, as well as storing the updated address.
:return: (changed, ip_address) A tuple with two members, where the first member indicates if the ip address is changed, the second member is the current ip address.
"""
old_ip = self._redis_store.current_ip
current_ip = socket.gethostbyname(self._host_name)
if current_ip != old_ip:
logging.info(f'IP changed: {old_ip} -> {current_ip}')
self._redis_store.current_ip = current_ip
return True, current_ip
return False, old_ip
+132
View File
@@ -0,0 +1,132 @@
#!/usr/bin/env python3
"""
This module contains the RedisSuperStorage module which is responsible to store and load the data structure used by the constumer.
"""
import redis
import os
import json
import time
class RedisSuperStorage:
"""
This class is the brain of the scheduler app, and probably half the brain of the whole consumer system.
This class provides access to the data structures used in the consumer system trough various getter and setter methods.
Kind of like an ORM would do.
Sometimes it is necessary to access more than one time to the redis database while qcquiring or setting some data.
In this cases some basic error handling is implemented to avoid most of the problems that could be caused by non-atomic operations.
"""
def __init__(self, redis_url: str, timeout: int):
"""
During the object creation the Redis connection is attempted to be established.
:param redis_url: URL of the redis database in the following form: `redis://localhost:6379/0`
:param timeout: Timeout to be set on keys, where it matters. (See the documentation for each method)
"""
self.r = redis.from_url(redis_url)
self._timeout = timeout
def get_consumer_list(self) -> dict:
"""
This function gets the list of consumers from the Redis database.
Consumers are saved to the database either by the API endpoint, or calling `update_consumer` of this class.
The consumers are volatile, they have a timeout set when they updated.
If they are not updated within that timeout they considered invalid, and does not returned by this method.
Each consumer is represented by the following structure::
{
"uuid" : "str",
"ip" : "str",
"last_seen": int
}
Meaning of the fields:
* `uuid` - The UUID of the remote consumer
* `ip` - The IP address of the remote consumer
* `last_seen` - The UNIX timestamp when the consumer was last seen.
:return: Despite it's name, this function returns a dictionary in which the keys are the uuids of each consumer.
"""
keys = self.r.keys('consumer_*')
list_of_customers = {}
for key in keys:
info = json.loads((self.r.get(key) or b"{}").decode('utf-8'))
if info:
list_of_customers[info['uuid']] = info
return list_of_customers
def get_producer_list(self) -> dict:
"""
This method returns a list of producer ip addresses, nothing more.
The producer ip addresses are volatile, they have a timeout set when they updated.
If they are not updated within that timeout they considered invalid, and does not returned by this method.
Producers are added to the redis database by the API endpoint with a timeout set on them.
:return: Despite it's name this function returns a dict... Similar to `get_consumer_list`. The keys are the keys stored in redis (lol)
"""
keys = self.r.keys('producer_*')
list_of_producer_ip = {}
for key in keys:
ip = (self.r.get(key) or b"").decode('utf-8')
if ip:
list_of_producer_ip[key.decode('utf-8')] = ip
return list_of_producer_ip
def update_consumer(self, uuid: str, ip: str):
"""
Updates (or creates) informations of a specific consumer in the redis database.
The default timeout is set on the keys, when stored in the database.
:param uuid: The uuid of the consumer to be updated.
:param ip: The ip address of that consumer.
"""
cust_key = f"consumer_{uuid}"
info = {
"uuid": uuid,
"ip": ip,
"last_seen": time.time()
}
self.r.set(cust_key, json.dumps(info).encode('utf-8'))
self.r.expire(cust_key, self._timeout)
def get_current_ip(self) -> str:
"""
This is a basic getter, which reads a single value from the Redis database.
:return: The ip address of the consumer stored in the redis database.
"""
ip = self.r.get('current_ip')
if ip:
ip = ip.decode('utf-8')
return ip
def set_current_ip(self, ip: str):
"""
This is the most basic setter in the whole object. This is stores a single value which is the ip address of the consumer.
The current ip in the redis storage does not time out.
:param ip: IP address to be set.
"""
self.r.set('current_ip', ip.encode('utf-8'))
current_ip = property(get_current_ip, set_current_ip)
+6
View File
@@ -0,0 +1,6 @@
pytest
pytest-runner
requests_mock
pytest-mock
mock
coverage
+514
View File
@@ -0,0 +1,514 @@
from redis_super_storage import RedisSuperStorage
from communicators import ConsumerCommunicator, ProducerCommunicator
from ip_watchdog import IPWatchdog
import pytest
import redis
import json
import socket
import requests
import requests.exceptions
import logging
import os
import app
REDIS_URL = "redis://localhost/0"
REDIS_TIMEOUT = 2
CURRENT_HOSTNAME = "testenv.local"
CURRENT_IPADDR = "192.168.1.50"
LOCAL_UUID = "testuuid1"
os.environ["LOCAL_UUID"] = LOCAL_UUID
os.environ["INITIAL_SERVERS"] = "127.0.0.1,192.168.0.1,172.20.0.2"
@pytest.fixture
def redis_super_storage_instance(mocker):
mocker.patch("redis.from_url")
yield RedisSuperStorage(REDIS_URL, REDIS_TIMEOUT)
@pytest.fixture
def ip_watchdog_instance(mocker, redis_super_storage_instance):
mocker.patch("socket.gethostname", side_effect=lambda: CURRENT_HOSTNAME)
mocker.patch("socket.gethostbyname", side_effect=lambda a: CURRENT_IPADDR)
yield IPWatchdog(redis_super_storage_instance)
@pytest.fixture
def consumer_communicator_instance(redis_super_storage_instance):
yield ConsumerCommunicator(redis_super_storage_instance)
@pytest.fixture
def producer_communicator_instance(redis_super_storage_instance):
yield ProducerCommunicator(redis_super_storage_instance)
# ========================================
# RedisSuperStorage
# ========================================
# __init__
def test_rst_instance_creation(mocker):
mocker.patch("redis.from_url")
rst = RedisSuperStorage("test", 2)
redis.from_url.assert_called_once_with("test")
assert rst._timeout == 2
# ip get/set
def test_rst_ip_getter_get_none(redis_super_storage_instance):
redis_super_storage_instance.r.get.side_effect = lambda a: None
ip = redis_super_storage_instance.current_ip
assert ip is None
redis_super_storage_instance.r.get.assert_called_once_with('current_ip')
def test_rst_ip_getter_get_ip(redis_super_storage_instance):
redis_super_storage_instance.r.get.side_effect = lambda a: b"127.0.0.1"
ip = redis_super_storage_instance.current_ip
assert ip == "127.0.0.1"
redis_super_storage_instance.r.get.assert_called_once_with('current_ip')
def test_rst_ip_getter_set_ip(redis_super_storage_instance):
redis_super_storage_instance.current_ip = "127.0.0.1"
redis_super_storage_instance.r.set.assert_called_once_with('current_ip', b"127.0.0.1")
# update consumer
def test_rst_update_consumer(mocker, redis_super_storage_instance):
mocker.patch("time.time", side_effect=lambda: 123)
redis_super_storage_instance.update_consumer("testuuid", "127.0.0.1")
cust_key = "consumer_testuuid"
info = {
"uuid": "testuuid",
"ip": "127.0.0.1",
"last_seen": 123
}
redis_super_storage_instance.r.set.assert_called_once_with(cust_key, json.dumps(info).encode('utf-8'))
redis_super_storage_instance.r.expire.assert_called_once_with(cust_key, REDIS_TIMEOUT)
# producer list
def test_rst_get_producer_list(redis_super_storage_instance):
redis_super_storage_instance.r.keys.side_effect = lambda a: [
b"producer_uuid1",
b"producer_uuid2",
b"producer_uuid3"
]
data = {
b"producer_uuid1": b"127.0.0.1",
b"producer_uuid2": b"127.0.0.2",
b"producer_uuid3": b"127.0.0.3",
}
redis_super_storage_instance.r.get.side_effect = lambda a: data[a]
lst = redis_super_storage_instance.get_producer_list()
assert lst == {
"producer_uuid1": "127.0.0.1",
"producer_uuid2": "127.0.0.2",
"producer_uuid3": "127.0.0.3",
}
redis_super_storage_instance.r.keys.assert_called_once_with("producer_*")
def test_rst_get_producer_expire_while_get(redis_super_storage_instance):
redis_super_storage_instance.r.keys.side_effect = lambda a: [b"producer_uuid1"]
redis_super_storage_instance.r.get.side_effect = lambda a: None
lst = redis_super_storage_instance.get_producer_list()
assert isinstance(lst, dict)
assert lst == {}
redis_super_storage_instance.r.keys.assert_called_once_with("producer_*")
# get_consumer_list
def test_rst_get_consumer_list(redis_super_storage_instance):
redis_super_storage_instance.r.keys.side_effect = lambda a: [
b"consumer_uuid1",
b"consumer_uuid2",
b"consumer_uuid3"
]
data = {
b"consumer_uuid1": json.dumps({
"uuid": "consumer_uuid1",
"ip": "127.0.0.1",
"last_seen": 123
}).encode("utf-8"),
b"consumer_uuid2": json.dumps({
"uuid": "consumer_uuid2",
"ip": "127.0.0.2",
"last_seen": 1234
}).encode("utf-8"),
b"consumer_uuid3": json.dumps({
"uuid": "consumer_uuid3",
"ip": "127.0.0.3",
"last_seen": 1235
}).encode("utf-8")
}
redis_super_storage_instance.r.get.side_effect = lambda a: data[a]
lst = redis_super_storage_instance.get_consumer_list()
assert lst == {
"consumer_uuid1": {
"uuid": "consumer_uuid1",
"ip": "127.0.0.1",
"last_seen": 123
},
"consumer_uuid2": {
"uuid": "consumer_uuid2",
"ip": "127.0.0.2",
"last_seen": 1234
},
"consumer_uuid3": {
"uuid": "consumer_uuid3",
"ip": "127.0.0.3",
"last_seen": 1235
}
}
redis_super_storage_instance.r.keys.assert_called_once_with("consumer_*")
assert redis_super_storage_instance.r.get.call_count == 3
def test_rst_get_consumer_list_expire_while_get(redis_super_storage_instance):
redis_super_storage_instance.r.keys.side_effect = lambda a: [b"consumer_uuid1"]
redis_super_storage_instance.r.get.side_effect = lambda a: None
lst = redis_super_storage_instance.get_consumer_list()
assert isinstance(lst, dict)
assert lst == {}
redis_super_storage_instance.r.keys.assert_called_once_with("consumer_*")
redis_super_storage_instance.r.get.assert_called_once_with(b"consumer_uuid1")
# ========================================
# IPWatchdog
# ========================================
# __init__
def test_ipw_instantiate(mocker, redis_super_storage_instance):
mocker.patch("socket.gethostname", side_effect=lambda: "test")
ipw = IPWatchdog(redis_super_storage_instance)
assert ipw._host_name == "test"
assert ipw._redis_store == redis_super_storage_instance
socket.gethostname.assert_called_once()
def test_ipw_is_changed_false(mocker, ip_watchdog_instance):
ip_watchdog_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8")
changed, ip = ip_watchdog_instance.ip_changed()
assert not changed
assert ip == CURRENT_IPADDR
ip_watchdog_instance._redis_store.r.get.assert_called_once_with("current_ip")
assert not ip_watchdog_instance._redis_store.r.set.called
socket.gethostbyname.assert_called_once_with(CURRENT_HOSTNAME)
def test_ipw_is_changed_true(mocker, ip_watchdog_instance):
mocker.patch("socket.gethostbyname", side_effect=lambda a: "192.168.2.123")
ip_watchdog_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8")
changed, ip = ip_watchdog_instance.ip_changed()
assert changed
assert ip == "192.168.2.123"
ip_watchdog_instance._redis_store.r.get.assert_called_once_with("current_ip")
assert ip_watchdog_instance._redis_store.r.set.called_once_with("current_ip", b"192.168.2.123")
socket.gethostbyname.assert_called_once_with(CURRENT_HOSTNAME)
# ========================================
# Communicators
# ========================================
def test_cc_instantiate(redis_super_storage_instance):
cc = ConsumerCommunicator(redis_super_storage_instance)
assert cc._redis_store == redis_super_storage_instance
assert isinstance(cc._session, requests.Session)
def test_pc_instantiate(redis_super_storage_instance):
pc = ProducerCommunicator(redis_super_storage_instance)
assert pc._redis_store == redis_super_storage_instance
assert isinstance(pc._session, requests.Session)
# producer communicator
def test_pc_push_ip_update(requests_mock, producer_communicator_instance):
producer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
b"producer_uuid1",
b"producer_uuid2",
b"producer_uuid3"
]
data = {
b"producer_uuid1": b"127.0.0.1",
b"producer_uuid2": b"127.0.0.2",
b"producer_uuid3": b"127.0.0.3",
}
producer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
first = requests_mock.post("http://127.0.0.1/ip")
second = requests_mock.post("http://127.0.0.2/ip")
third = requests_mock.post("http://127.0.0.3/ip")
producer_communicator_instance.push_ip_update(CURRENT_IPADDR)
assert first.call_count == 1
assert second.call_count == 1
assert third.call_count == 1
assert first.last_request.json() == second.last_request.json() == third.last_request.json()
assert first.last_request.json()['ip'] == CURRENT_IPADDR
assert first.last_request.json()['uuid'] == LOCAL_UUID
def test_pc_push_ip_update_error_logged(mocker, requests_mock, producer_communicator_instance):
mocker.patch("logging.warning")
producer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
b"producer_uuid1",
b"producer_uuid2",
b"producer_uuid3"
]
data = {
b"producer_uuid1": b"127.0.0.1",
b"producer_uuid2": b"127.0.0.2",
b"producer_uuid3": b"127.0.0.3",
}
producer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
first = requests_mock.post("http://127.0.0.1/ip")
second = requests_mock.post("http://127.0.0.2/ip", exc=requests.exceptions.ConnectTimeout)
third = requests_mock.post("http://127.0.0.3/ip")
producer_communicator_instance.push_ip_update(CURRENT_IPADDR)
assert first.call_count == 1
assert second.call_count == 1
assert third.call_count == 1
assert first.last_request.json() == third.last_request.json()
assert first.last_request.json()['ip'] == CURRENT_IPADDR
assert first.last_request.json()['uuid'] == LOCAL_UUID
logging.warning.assert_called_once()
# customer communicator
def test_cc_targeted_sync(requests_mock, consumer_communicator_instance):
a = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "testasdasdasd"})
consumer_communicator_instance.targeted_snyc("127.0.0.2")
assert a.called
assert a.last_request.json() == {'uuid': LOCAL_UUID}
def test_cc_targeted_sync_ip_override(requests_mock, consumer_communicator_instance):
consumer_communicator_instance._force_ip_override = True
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: CURRENT_IPADDR.encode("utf-8")
a = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "testasdasdasd"})
consumer_communicator_instance.targeted_snyc("127.0.0.2")
assert a.called
assert a.last_request.json() == {'uuid': LOCAL_UUID, "ip": CURRENT_IPADDR}
def test_cc_targeted_sync_error_logged(mocker, requests_mock, consumer_communicator_instance):
mocker.patch("logging.warning")
requests_mock.post("http://127.0.0.2/sync", exc=requests.exceptions.ConnectTimeout)
consumer_communicator_instance.targeted_snyc("127.0.0.2")
logging.warning.assert_called_once()
def test_cc_sync_all(requests_mock, consumer_communicator_instance):
consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
b"consumer_uuid1",
b"consumer_uuid2",
b"consumer_uuid3"
]
data = {
b"consumer_uuid1": json.dumps({
"uuid": "consumer_uuid1",
"ip": "127.0.0.1",
"last_seen": 123
}).encode("utf-8"),
b"consumer_uuid2": json.dumps({
"uuid": "consumer_uuid2",
"ip": "127.0.0.2",
"last_seen": 1234
}).encode("utf-8"),
b"consumer_uuid3": json.dumps({
"uuid": "consumer_uuid3",
"ip": "127.0.0.3",
"last_seen": 1235
}).encode("utf-8")
}
first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"})
second = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "consumer_uuid2"})
third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"})
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
consumer_communicator_instance.sync_all()
assert first.called
assert second.called
assert third.called
assert first.last_request.json() == second.last_request.json() == third.last_request.json()
assert first.last_request.json()['uuid'] == LOCAL_UUID
def test_cc_sync_all_ip_override(requests_mock, consumer_communicator_instance):
consumer_communicator_instance._force_ip_override = True
consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
b"consumer_uuid1",
b"consumer_uuid2",
b"consumer_uuid3"
]
data = {
b"consumer_uuid1": json.dumps({
"uuid": "consumer_uuid1",
"ip": "127.0.0.1",
"last_seen": 123
}).encode("utf-8"),
b"consumer_uuid2": json.dumps({
"uuid": "consumer_uuid2",
"ip": "127.0.0.2",
"last_seen": 1234
}).encode("utf-8"),
b"consumer_uuid3": json.dumps({
"uuid": "consumer_uuid3",
"ip": "127.0.0.3",
"last_seen": 1235
}).encode("utf-8"),
"current_ip": CURRENT_IPADDR.encode("utf-8")
}
first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"})
second = requests_mock.post("http://127.0.0.2/sync", json={"uuid": "consumer_uuid2"})
third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"})
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
consumer_communicator_instance.sync_all()
assert first.called
assert second.called
assert third.called
assert first.last_request.json() == second.last_request.json() == third.last_request.json()
assert first.last_request.json()['uuid'] == LOCAL_UUID
assert first.last_request.json()['ip'] == CURRENT_IPADDR
def test_cc_sync_all_error_logged(mocker, requests_mock, consumer_communicator_instance):
mocker.patch("logging.warning")
consumer_communicator_instance._redis_store.r.keys.side_effect = lambda a: [
b"consumer_uuid1",
b"consumer_uuid2",
b"consumer_uuid3"
]
data = {
b"consumer_uuid1": json.dumps({
"uuid": "consumer_uuid1",
"ip": "127.0.0.1",
"last_seen": 123
}).encode("utf-8"),
b"consumer_uuid2": json.dumps({
"uuid": "consumer_uuid2",
"ip": "127.0.0.2",
"last_seen": 1234
}).encode("utf-8"),
b"consumer_uuid3": json.dumps({
"uuid": "consumer_uuid3",
"ip": "127.0.0.3",
"last_seen": 1235
}).encode("utf-8")
}
first = requests_mock.post("http://127.0.0.1/sync", json={"uuid": "consumer_uuid1"})
second = requests_mock.post("http://127.0.0.2/sync", exc=requests.exceptions.ConnectTimeout)
third = requests_mock.post("http://127.0.0.3/sync", json={"uuid": "consumer_uuid3"})
consumer_communicator_instance._redis_store.r.get.side_effect = lambda a: data[a]
consumer_communicator_instance.sync_all()
assert first.called
assert second.called
assert third.called
assert first.last_request.json() == third.last_request.json()
assert first.last_request.json()['uuid'] == LOCAL_UUID
logging.warning.assert_called_once()
# ========================================
# App
# ========================================
def test_app_get_initial_ip_list():
lst = app.get_initial_ip_list()
assert lst == ["127.0.0.1", "192.168.0.1", "172.20.0.2"]