diff --git a/messaqeue/new_task.py b/messaqeue/new_task.py new file mode 100644 index 0000000..d1e4fe0 --- /dev/null +++ b/messaqeue/new_task.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +import pika +import sys + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.queue_declare(queue='task_queue', durable=True) + +message = ' '.join(sys.argv[1:]) or "Hello World!" +channel.basic_publish( + exchange='', + routing_key='task_queue', + body=message, + properties=pika.BasicProperties( + delivery_mode=2, # make message persistent + )) +print(" [x] Sent %r" % message) +connection.close() diff --git a/messaqeue/recieve.py b/messaqeue/recieve.py new file mode 100644 index 0000000..5675446 --- /dev/null +++ b/messaqeue/recieve.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python +import pika + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.queue_declare(queue='hello') + + +def callback(ch, method, properties, body): + print(" [x] Received %r" % body) + + +channel.basic_consume( + queue='hello', on_message_callback=callback, auto_ack=True) + +print(' [*] Waiting for messages. To exit press CTRL+C') +channel.start_consuming() diff --git a/messaqeue/send.py b/messaqeue/send.py new file mode 100644 index 0000000..bfa5b31 --- /dev/null +++ b/messaqeue/send.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +import pika + +connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) +channel = connection.channel() + +channel.queue_declare(queue='hello') + +channel.basic_publish(exchange='', + routing_key='hello', + body='Hello World!') +print(" [x] Sent 'Hello World!'") + +connection.close() diff --git a/messaqeue/worker.py b/messaqeue/worker.py new file mode 100644 index 0000000..55f6ada --- /dev/null +++ b/messaqeue/worker.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python +import pika +import time + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.queue_declare(queue='task_queue', durable=True) +print(' [*] Waiting for messages. To exit press CTRL+C') + + +def callback(ch, method, properties, body): + print(" [x] Received %r" % body) + time.sleep(body.count(b'.')) + print(" [x] Done") + ch.basic_ack(delivery_tag=method.delivery_tag) + + +channel.basic_qos(prefetch_count=1) +channel.basic_consume(queue='task_queue', on_message_callback=callback) + +channel.start_consuming()