From 8bab3d9116c4aaebca118b750c3d63bab05abfbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torma=20Krist=C3=B3f?= Date: Sat, 24 Apr 2021 19:01:26 +0200 Subject: [PATCH] cleanup and kafka producer --- kafka/publisher.py | 14 ++++++++++++++ rabbit/consumer.py | 1 - rabbit/publisher.py | 3 ++- 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/kafka/publisher.py b/kafka/publisher.py index e5a0d9b..6b9b0c1 100644 --- a/kafka/publisher.py +++ b/kafka/publisher.py @@ -1 +1,15 @@ #!/usr/bin/env python3 +from kafka import KafkaProducer + + +producer = KafkaProducer(bootstrap_servers='localhost:9092') + + +try: + n = 10 + string = "\"" + "a" * n + "\"" + binstring = string.encode('UTF-8') + while True: + producer.send('test', binstring) +except Exception: + producer.close() diff --git a/rabbit/consumer.py b/rabbit/consumer.py index 62a20a5..ff9c6c3 100644 --- a/rabbit/consumer.py +++ b/rabbit/consumer.py @@ -12,7 +12,6 @@ def callback(ch, method, properties, body): try: - n = 10 credentials = pika.PlainCredentials("rabbit", "rabbit") connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost", credentials=credentials, heartbeat=0, socket_timeout=5)) channel = connection.channel() diff --git a/rabbit/publisher.py b/rabbit/publisher.py index b1fd22b..36c5d4d 100644 --- a/rabbit/publisher.py +++ b/rabbit/publisher.py @@ -7,7 +7,8 @@ try: channel = connection.channel() channel.exchange_declare(exchange="test",exchange_type='direct') string = "\"" + "a" * n + "\"" + binstring = string.encode('UTF-8') while True: - channel.basic_publish(exchange="test",routing_key="test",body=string.encode('UTF-8')) + channel.basic_publish(exchange="test",routing_key="test",body=binstring) except Exception: connection.close()