From 63c913571de784782029bcc7b4946fde86665897 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torma=20Krist=C3=B3f?= Date: Tue, 31 Mar 2020 14:13:11 +0200 Subject: [PATCH] add messagequeue examples --- messaqeue/new_task.py | 20 ++++++++++++++++++++ messaqeue/recieve.py | 19 +++++++++++++++++++ messaqeue/send.py | 14 ++++++++++++++ messaqeue/worker.py | 23 +++++++++++++++++++++++ 4 files changed, 76 insertions(+) create mode 100644 messaqeue/new_task.py create mode 100644 messaqeue/recieve.py create mode 100644 messaqeue/send.py create mode 100644 messaqeue/worker.py diff --git a/messaqeue/new_task.py b/messaqeue/new_task.py new file mode 100644 index 0000000..d1e4fe0 --- /dev/null +++ b/messaqeue/new_task.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +import pika +import sys + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.queue_declare(queue='task_queue', durable=True) + +message = ' '.join(sys.argv[1:]) or "Hello World!" +channel.basic_publish( + exchange='', + routing_key='task_queue', + body=message, + properties=pika.BasicProperties( + delivery_mode=2, # make message persistent + )) +print(" [x] Sent %r" % message) +connection.close() diff --git a/messaqeue/recieve.py b/messaqeue/recieve.py new file mode 100644 index 0000000..5675446 --- /dev/null +++ b/messaqeue/recieve.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python +import pika + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.queue_declare(queue='hello') + + +def callback(ch, method, properties, body): + print(" [x] Received %r" % body) + + +channel.basic_consume( + queue='hello', on_message_callback=callback, auto_ack=True) + +print(' [*] Waiting for messages. To exit press CTRL+C') +channel.start_consuming() diff --git a/messaqeue/send.py b/messaqeue/send.py new file mode 100644 index 0000000..bfa5b31 --- /dev/null +++ b/messaqeue/send.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +import pika + +connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) +channel = connection.channel() + +channel.queue_declare(queue='hello') + +channel.basic_publish(exchange='', + routing_key='hello', + body='Hello World!') +print(" [x] Sent 'Hello World!'") + +connection.close() diff --git a/messaqeue/worker.py b/messaqeue/worker.py new file mode 100644 index 0000000..55f6ada --- /dev/null +++ b/messaqeue/worker.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python +import pika +import time + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.queue_declare(queue='task_queue', durable=True) +print(' [*] Waiting for messages. To exit press CTRL+C') + + +def callback(ch, method, properties, body): + print(" [x] Received %r" % body) + time.sleep(body.count(b'.')) + print(" [x] Done") + ch.basic_ack(delivery_tag=method.delivery_tag) + + +channel.basic_qos(prefetch_count=1) +channel.basic_consume(queue='task_queue', on_message_callback=callback) + +channel.start_consuming()