bigchungusdata/kafka/consumer.py

31 lines
674 B
Python
Raw Normal View History

2021-04-24 18:43:28 +02:00
#!/usr/bin/env python3
from datetime import datetime
2021-04-24 20:12:01 +02:00
from kafka import KafkaConsumer
DATETIMES=[]
consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest', consumer_timeout_ms=1000)
consumer.subscribe(['test'])
try:
while True:
for message in consumer:
DATETIMES.append(datetime.now())
consumer.close()
2021-04-25 12:24:41 +02:00
except KeyboardInterrupt:
2021-04-24 20:12:01 +02:00
consumer.close()
2021-04-25 13:02:15 +02:00
dt = {}
for datetime in DATETIMES:
dts = datetime.strftime("%m/%d/%Y, %H:%M:%S")
if dts not in dt:
2021-04-25 13:07:10 +02:00
dt[dts] = 1
2021-04-25 13:02:15 +02:00
else:
2021-04-25 13:07:10 +02:00
dt[dts] +=1
2021-04-25 13:08:51 +02:00
for key in dt:
print(key,",",dt[key])
2021-04-25 13:11:10 +02:00
consumer.close()