This repository has been archived on 2020-09-24. You can view files and clone it, but cannot push or open issues or pull requests.
consumer-scheduler/redis_super_storage.py

68 lines
1.6 KiB
Python

#!/usr/bin/env python3
"""
This module contains the RedisSuperStorage module which is responsible to store and load the data structure used by the constumer.
"""
import redis
import os
import json
import time
class RedisSuperStorage:
def __init__(self, redis_url: str, timeout: int):
self.r = redis.from_url(redis_url)
self._timeout = timeout
def get_consumer_list(self) -> dict:
keys = self.r.keys('consumer_*')
list_of_customers = {}
for key in keys:
info = json.loads((self.r.get(key) or b"{}").decode('utf-8'))
if info:
list_of_customers[info['uuid']] = info
return list_of_customers
def get_producer_list(self) -> dict:
keys = self.r.keys('producer_*')
list_of_producer_ip = {}
for key in keys:
ip = (self.r.get(key) or b"").decode('utf-8')
if ip:
list_of_producer_ip[key.decode('utf-8')] = ip
return list_of_producer_ip
def update_consumer(self, uuid: str, ip: str):
cust_key = f"consumer_{uuid}"
info = {
"uuid": uuid,
"ip": ip,
"last_seen": time.time()
}
self.r.set(cust_key, json.dumps(info).encode('utf-8'))
self.r.expire(cust_key, self._timeout)
def get_current_ip(self) -> str:
ip = self.r.get('current_ip')
if ip:
ip = ip.decode('utf-8')
return ip
def set_current_ip(self, ip: str):
self.r.set('current_ip', ip.encode('utf-8'))
current_ip = property(get_current_ip, set_current_ip)