kaninchen & schlangen: rabbitmq & python
DESCRIPTION
RabbitMQ ist ein Message Broker, der das Advanced Message Queuing Protocol (AMQP) nutzt. Man kann RabbitMQ zum Entkoppeln der verschiedenen Teile einer Applikation nutzen und die Nachrichten asynchron verarbeiten. Das bringt mehr Flexibilität, Skalierbarkeit und Performance.In diesem Vortrag werden verschiedene Python-Bibliotheken zur Nutzung des AMQP und ein Distributed Task Queue, der mit RabbitMQ arbeitet, vorgestellt.TRANSCRIPT
Kaninchen & SchlangenRabbitMQ & Python
Markus Zapke-Gründemann11. DZUG-Tagung zu Zope, Plone und Python
MarkusZapke-Gründemann• Softwareentwickler seit 2001
• Schwerpunkt: Web Application Development mit Python und PHP
• Django, symfony & Zend Framework
• Freier Softwareentwickler und Berater seit 2008
• www.keimlink.de
Übersicht
• Warum Queues?
• AMQP
• RabbitMQ
• Python Bibliotheken
• Carrot
• Celery
Warum Queues?
• Entkoppeln von Informationsproduzenten und -konsumenten
• Asynchrone Verarbeitung
• Load Balancing
• Skalierbarkeit
AMQP
• Advanced Message Queuing Protocol
• Offenes Protokoll
• Plattformunabhängig
• Port 5673/tcp
• http://www.amqp.org/
AMQP
• Offener Standard für Messaging Middleware
• Virtual Hosts
• Exchange (durable oder auto-deleted)
• Binding
• Queue (durable oder auto-deleted)
Producer - Consumer
Quelle: http://www.redhat.com/docs/en-US/Red_Hat_Enterprise_MRG/1.0/html/Messaging_Tutorial/index.html
Fanout Exchange
Quelle: http://www.redhat.com/docs/en-US/Red_Hat_Enterprise_MRG/1.0/html/Messaging_Tutorial/index.html
Direct Exchange
Quelle: http://www.redhat.com/docs/en-US/Red_Hat_Enterprise_MRG/1.0/html/Messaging_Tutorial/index.html
Topic Exchange
Quelle: http://www.redhat.com/docs/en-US/Red_Hat_Enterprise_MRG/1.0/html/Messaging_Tutorial/index.html
RabbitMQ
• AMQP Message Broker
• Erlang
• Open Source
• Mitglied in der AMQP Working Group
• XMPP, SMTP, STOMP und HTTP (mit Adaptern)
• http://www.rabbitmq.com/
Virtual Host Access Controlmit RabbitMQ
$ rabbitmqctl add_user username secret$ rabbitmqctl add_vhost message-vhost$ rabbitmqctl set_permissions -p message-vhost username ".*" ".*" ".*"
Konfiguration
LesenSchreiben
Python Bibliotheken
• amqplib - Python AMQP Client
• carrot - AMQP Messaging Framework
• pika - Python AMQP Client
• txAMQP - Python AMQP Library für Twisted
Carrot
• AMQP Messaging Framework
• High-Level Interface
• Benutzt amqplib
• Serialisierung (Pickle, JSON, YAML)
• Autor: Ask Solem
• http://github.com/ask/carrot/
exchange = 'messaging'queue = 'mbox'routing_key = 'message'
conf.py
connection.pyfrom carrot.connection import BrokerConnection
conn = BrokerConnection(hostname='localhost', userid='username', password='secret', virtual_host='message-vhost')
import datetime
from carrot.messaging import Publisherfrom carrot.utils import gen_unique_id
from conf import exchange, queue, routing_keyfrom connection import conn
publisher = Publisher(connection=conn, exchange=exchange, routing_key=routing_key, queue=queue, serializer='pickle')data = {'message_id': gen_unique_id(), 'timestamp': datetime.datetime.now(), 'message': 'Lorem ipsum dolor sit amet.'}publisher.send(data)
publisher.py
consumer.pyfrom carrot.messaging import Consumer
from conf import exchange, queue, routing_keyfrom connection import conn
class MessageConsumer(Consumer): def receive(self, message_data, message): data = (message.delivery_tag, message_data['message'], message_data['message_id'], message_data['timestamp'].isoformat()) print 'Message %d "%s" with id %s sent at %s.' % data message.ack()
if __name__ == '__main__': consumer = MessageConsumer(connection=conn, queue=queue, exchange=exchange, routing_key=routing_key) consumer.wait()
CeleryDistributed Task Queue• Backends: RabbitMQ, STOMP, Redis, Ghetto
Queue
• Clustering mit RabbitMQ
• Webhooks
• Django-Integration (optional)
• Autor: Ask Solem
• http://github.com/ask/celery/
CeleryDistributed Task Queue
• Serialisierung (Pickle, JSON, YAML)
• Parallele Ausführung
• Zeitgesteuerte Ausführung
• SQLAlchemy, carrot, anyjson
Python Task# Task als Klassefrom celery.task import Taskfrom myproject.models import User
class CreateUserTask(Task): def run(self, username, password): User.create(username, password)
>>> from tasks import CreateUserTask>>> CreateUserTask().delay('john', 'secret')
# Task als Funktion mit Decoratorfrom celery.decorators import taskfrom myproject.models import User
@task()def create_user(username, password): User.create(username, password)
>>> from tasks import create_user>>> create_user.delay('john', 'secret')
Python Task@task() # Benutzt pickle, um das Objekt zu serialisieren.def check_means(user): return user.has_means()
>>> from tasks import check_means>>> result = check_means.delay(user)>>> result.ready() # Gibt True zurück wenn der Task beendet ist.False>>> result.result # Task ist noch nicht beendet, kein Ergebnis verfügbar.None>>> result.get() # Warten bis der Task fertig ist und Ergebnis zurückgeben.93.27>>> result.result # Jetzt ist ein Ergebnis da.93.27>>> result.successful() # War der Task erfolgreich?True
Python TaskKonfiguration
$ celeryd --loglevel=INFO
# celeryconfig.pyBROKER_HOST = "localhost"BROKER_PORT = 5672BROKER_USER = "myuser"BROKER_PASSWORD = "mypassword"BROKER_VHOST = "myvhost"
CELERY_RESULT_BACKEND = "database"CELERY_RESULT_DBURI = "mysql://user:password@host/dbname"
CELERY_IMPORTS = ("tasks", )
Zeitgesteuerter Task# Periodic taskfrom celery.decorators import periodic_taskfrom datetime import timedelta
@periodic_task(run_every=timedelta(seconds=30))def every_30_seconds(): print("Running periodic task!")
# crontabfrom celery.task.schedules import crontabfrom celery.decorators import periodic_task
@periodic_task(run_every=crontab(hour=7, minute=30, day_of_week=1))def every_monday_morning(): print("Execute every Monday at 7:30AM.")
$ celerybeat$ celeryd -B
Fotos der Titelfolie
• Kaninchen: Gidzy http://www.flickr.com/photos/gidzy/3614942864/
• Grüne Python: Frank Wouters - http://www.flickr.com/photos/frank-wouters/540417590/
Lizenz
Dieses Werk ist unter einem Creative Commons Namensnennung-Weitergabe unter gleichen
Bedingungen 3.0 Unported Lizenzvertrag lizenziert. Um die Lizenz anzusehen, gehen Sie bitte zu
http://creativecommons.org/licenses/by-sa/3.0/ oder schicken Sie einen Brief an Creative Commons, 171 Second Street, Suite 300, San Francisco, California
94105, USA.