133 lines
4.6 KiB
Python
133 lines
4.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
This module contains the RedisSuperStorage module which is responsible to store and load the data structure used by the constumer.
|
|
"""
|
|
import redis
|
|
import os
|
|
import json
|
|
import time
|
|
|
|
|
|
class RedisSuperStorage:
|
|
"""
|
|
This class is the brain of the scheduler app, and probably half the brain of the whole consumer system.
|
|
|
|
This class provides access to the data structures used in the consumer system trough various getter and setter methods.
|
|
Kind of like an ORM would do.
|
|
|
|
Sometimes it is necessary to access more than one time to the redis database while qcquiring or setting some data.
|
|
In this cases some basic error handling is implemented to avoid most of the problems that could be caused by non-atomic operations.
|
|
"""
|
|
|
|
def __init__(self, redis_url: str, timeout: int):
|
|
"""
|
|
During the object creation the Redis connection is attempted to be established.
|
|
|
|
:param redis_url: URL of the redis database in the following form: `redis://localhost:6379/0`
|
|
:param timeout: Timeout to be set on keys, where it matters. (See the documentation for each method)
|
|
"""
|
|
self.r = redis.from_url(redis_url)
|
|
self._timeout = timeout
|
|
|
|
def get_consumer_list(self) -> dict:
|
|
"""
|
|
This function gets the list of consumers from the Redis database.
|
|
|
|
Consumers are saved to the database either by the API endpoint, or calling `update_consumer` of this class.
|
|
The consumers are volatile, they have a timeout set when they updated.
|
|
If they are not updated within that timeout they considered invalid, and does not returned by this method.
|
|
|
|
|
|
Each consumer is represented by the following structure::
|
|
|
|
{
|
|
"uuid" : "str",
|
|
"ip" : "str",
|
|
"last_seen": int
|
|
}
|
|
|
|
Meaning of the fields:
|
|
* `uuid` - The UUID of the remote consumer
|
|
* `ip` - The IP address of the remote consumer
|
|
* `last_seen` - The UNIX timestamp when the consumer was last seen.
|
|
|
|
:return: Despite it's name, this function returns a dictionary in which the keys are the uuids of each consumer.
|
|
"""
|
|
keys = self.r.keys('consumer_*')
|
|
|
|
list_of_customers = {}
|
|
|
|
for key in keys:
|
|
info = json.loads((self.r.get(key) or b"{}").decode('utf-8'))
|
|
|
|
if info:
|
|
list_of_customers[info['uuid']] = info
|
|
|
|
return list_of_customers
|
|
|
|
def get_producer_list(self) -> dict:
|
|
"""
|
|
This method returns a list of producer ip addresses, nothing more.
|
|
The producer ip addresses are volatile, they have a timeout set when they updated.
|
|
If they are not updated within that timeout they considered invalid, and does not returned by this method.
|
|
|
|
Producers are added to the redis database by the API endpoint with a timeout set on them.
|
|
|
|
:return: Despite it's name this function returns a dict... Similar to `get_consumer_list`. The keys are the keys stored in redis (lol)
|
|
"""
|
|
keys = self.r.keys('producer_*')
|
|
|
|
list_of_producer_ip = {}
|
|
|
|
for key in keys:
|
|
ip = (self.r.get(key) or b"").decode('utf-8')
|
|
|
|
if ip:
|
|
list_of_producer_ip[key.decode('utf-8')] = ip
|
|
|
|
return list_of_producer_ip
|
|
|
|
def update_consumer(self, uuid: str, ip: str):
|
|
"""
|
|
Updates (or creates) informations of a specific consumer in the redis database.
|
|
The default timeout is set on the keys, when stored in the database.
|
|
|
|
:param uuid: The uuid of the consumer to be updated.
|
|
:param ip: The ip address of that consumer.
|
|
"""
|
|
|
|
cust_key = f"consumer_{uuid}"
|
|
|
|
info = {
|
|
"uuid": uuid,
|
|
"ip": ip,
|
|
"last_seen": time.time()
|
|
}
|
|
|
|
self.r.set(cust_key, json.dumps(info).encode('utf-8'))
|
|
self.r.expire(cust_key, self._timeout)
|
|
|
|
def get_current_ip(self) -> str:
|
|
"""
|
|
This is a basic getter, which reads a single value from the Redis database.
|
|
|
|
:return: The ip address of the consumer stored in the redis database.
|
|
"""
|
|
ip = self.r.get('current_ip')
|
|
|
|
if ip:
|
|
ip = ip.decode('utf-8')
|
|
|
|
return ip
|
|
|
|
def set_current_ip(self, ip: str):
|
|
"""
|
|
This is the most basic setter in the whole object. This is stores a single value which is the ip address of the consumer.
|
|
|
|
The current ip in the redis storage does not time out.
|
|
:param ip: IP address to be set.
|
|
"""
|
|
self.r.set('current_ip', ip.encode('utf-8'))
|
|
|
|
current_ip = property(get_current_ip, set_current_ip)
|