Kovács Bence
644c2df888
All checks were successful
continuous-integration/drone/push Build is passing
57 lines
1.3 KiB
Python
57 lines
1.3 KiB
Python
#!/usr/bin/env python
|
|
|
|
"""
|
|
Unit tests for the producer-endpoint module.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import pytest
|
|
import redis
|
|
from pytest_redis import factories
|
|
import app
|
|
|
|
|
|
generateduuid = 'c959ad81-58f9-4445-aab4-8f3d68aee1ad'
|
|
redis_proc = factories.redis_proc(host='cache', port=6379)
|
|
redis_db = factories.redisdb('redis_nooproc')
|
|
redstuff = redis.Redis.from_url(url=os.environ['REDIS_URL'])
|
|
redstuff.set('currentConsumer', json.dumps({'uuid': generateduuid, 'Host': "127.2.2.2"}).encode('utf-8'))
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
"""Client fixture"""
|
|
with app.app.test_client() as client:
|
|
yield client
|
|
|
|
|
|
def test_set_currentconsumer_success(client):
|
|
|
|
r = client.post("/ip", json={'uuid': generateduuid, 'ip': "127.1.2.3"})
|
|
|
|
json_in_redis = json.loads(redstuff.get('currentConsumer'))
|
|
|
|
assert r.status_code == 204
|
|
assert json_in_redis == {'uuid': generateduuid, 'Host': "127.1.2.3"}
|
|
|
|
|
|
def test_set_currentconsumer_server_error(client):
|
|
|
|
r = client.post("/ip", json="unexpected")
|
|
|
|
assert r.status_code == 500
|
|
|
|
|
|
def test_set_currentconsumer_notfound(client):
|
|
|
|
r = client.post("/test")
|
|
|
|
assert r.status_code == 404
|
|
|
|
|
|
def test_set_currentconsumer_method_not_allowed(client):
|
|
|
|
r = client.get("/ip")
|
|
|
|
assert r.status_code == 405
|