8 Stimmen

Verwendung von Tornado mit Pika für asynchrone Warteschlangenüberwachung

Ich habe einen AMQP-Server ( RabbitMQ ), die ich in einem Buch veröffentlichen und lesen möchte. Tornado-Webserver . Um dies zu tun, dachte ich, ich würde eine asynchrone amqp Python-Bibliothek verwenden; insbesondere Pika (eine Variante davon, die angeblich Tornado unterstützt).

Ich habe Code geschrieben, der erfolgreich aus der Warteschlange zu lesen scheint, außer, dass am Ende der Anforderung, ich eine Ausnahme erhalten (der Browser kehrt gut):

[E 101219 01:07:35 web:868] Uncaught exception GET / (127.0.0.1)
    HTTPRequest(protocol='http', host='localhost:5000', method='GET', uri='/', version='HTTP/1.1', remote_ip='127.0.0.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:5000', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Keep-Alive': '115', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': 'Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.10 (maverick) Firefox/3.6.13', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'If-None-Match': '"58f554b64ed24495235171596351069588d0260e"'})
    Traceback (most recent call last):
      File "/home/dave/devel/lib/python2.6/site-packages/tornado/web.py", line 810, in _stack_context
        yield
      File "/home/dave/devel/lib/python2.6/site-packages/tornado/stack_context.py", line 77, in StackContext
        yield
      File "/usr/lib/python2.6/contextlib.py", line 113, in nested
        yield vars
      File "/home/dave/lib/python2.6/site-packages/tornado/stack_context.py", line 126, in wrapped
        callback(*args, **kwargs)
      File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 42, in _handle_events
        self._handle_read()
      File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 66, in _handle_read
        self.on_data_available(chunk)
      File "/home/dave/devel/src/pika/pika/connection.py", line 521, in on_data_available
        self.channels[frame.channel_number].frame_handler(frame)
    KeyError: 1

Ich bin mir nicht ganz sicher, ob ich diese Bibliothek richtig verwende, es kann also sein, dass ich etwas eklatant falsch mache. Der grundlegende Fluss von meinem Code ist:

  1. Anfrage kommt rein
  2. Verbindung zu RabbitMQ mit TornadoConnection erstellen; einen Callback angeben
  3. In connection callback einen Kanal erstellen, meine Warteschlange deklarieren/binden und basic_consume aufrufen; einen callback angeben
  4. In consume callback schließen Sie den Kanal und rufen die Funktion finish von Tornado auf.
  5. Siehe Ausnahme.

Ich habe ein paar Fragen:

  1. Ist dieser Fluss überhaupt korrekt? Ich bin nicht sicher, was der Zweck der Verbindung Callback ist, außer dass es nicht funktioniert, wenn ich es nicht verwenden.
  2. Sollte ich eine AMQP-Verbindung pro Webanforderung erstellen? Die Dokumentation von RabbitMQ legt nahe, dass ich das nicht tun sollte, sondern dass ich nur Kanäle erstellen sollte. Aber wo würde ich die Verbindung erstellen und wie kann ich versuchen, die Verbindung wiederherzustellen, wenn sie kurzzeitig ausfällt?
  3. Wenn ich eine AMQP-Verbindung pro Webanforderung herstelle, wo sollte ich sie schließen? Der Aufruf von amqp.close() in meinem Callback scheint die Dinge noch mehr zu vermasseln.

Ich werde versuchen, etwas später einen Beispielcode hochzuladen, aber die Schritte, die ich oben beschrieben habe, legen die verbrauchende Seite der Dinge ziemlich vollständig dar. Ich habe Probleme mit der Veröffentlichung Seite als gut, aber der Verbrauch von Warteschlangen ist dringender.

0 Stimmen

Den Code selbst zu sehen ist viel besser als eine verbale Erklärung zu lesen.

8voto

jonesy Punkte 3380

Es wäre hilfreich, etwas Quellcode zu sehen, aber ich verwende dasselbe tornadounterstützende Pika-Modul ohne Probleme in mehr als einem Produktionsprojekt.

Sie wollen nicht für jede Anfrage eine Verbindung herstellen. Erstellen Sie eine Klasse, die alle Ihre AMQP-Operationen umhüllt, und instanziieren Sie sie als Singleton auf der Ebene der Tornado-Anwendung, die für alle Anfragen (und Request-Handler) verwendet werden kann. Ich tue dies in einer 'runapp()'-Funktion, die einige Dinge wie diese tut und dann den Haupt-Tornado ioloop startet.

Hier ist eine Klasse namens 'Events'. Es ist eine Teilimplementierung (insbesondere definiere ich hier nicht 'self.handle_event'. Das bleibt Ihnen überlassen.

class Event(object):
  def __init__(self, config):
    self.host = 'localhost'
    self.port = '5672'
    self.vhost = '/'
    self.user = 'foo'
    self.exchange = 'myx'
    self.queue = 'myq'
    self.recv_routing_key = 'msgs4me'
    self.passwd = 'bar'

    self.connected = False 
    self.connect()

  def connect(self):

    credentials = pika.PlainCredentials(self.user, self.passwd)

    parameters = pika.ConnectionParameters(host = self.host,
                                         port = self.port,
                                         virtual_host = self.vhost,
                                         credentials = credentials)

    srs = pika.connection.SimpleReconnectionStrategy()

    logging.debug('Events: Connecting to AMQP Broker: %s:%i' % (self.host,
                                                              self.port))
    self.connection = tornado_adapter.TornadoConnection(parameters,
                                                      wait_for_open = False,
                                                      reconnection_strategy = srs,
                                                      callback = self.on_connected)

  def on_connected(self):

    # Open the channel
    logging.debug("Events: Opening a channel")
    self.channel = self.connection.channel()

    # Declare our exchange
    logging.debug("Events: Declaring the %s exchange" %  self.exchange)
    self.channel.exchange_declare(exchange = self.exchange,
                                type = "fanout",
                                auto_delete = False,
                                durable = True)

    # Declare our queue for this process
    logging.debug("Events: Declaring the %s queue" %  self.queue)
    self.channel.queue_declare(queue = self.queue,
                             auto_delete = False,
                             exclusive = False,
                             durable = True)

    # Bind to the exchange
    self.channel.queue_bind(exchange = self.exchange,
                          queue = self.queue,
                          routing_key = self.recv_routing_key)

    self.channel.basic_consume(consumer = self.handle_event, queue = self.queue, no_ack = True)

    # We should be connected if we made it this far
    self.connected = True

Und dann habe ich das in eine Datei namens "events.py". Meine RequestHandler und jeder Backend-Code verwenden alle ein "common.py"-Modul, das Code verpackt, der für beide nützlich ist (meine RequestHandler rufen keine amqp-Modul-Methoden direkt auf - dasselbe gilt auch für db, cache usw.), also definiere ich "events=None" auf der Modulebene in common.py und instanziere das Event-Objekt etwa so:

import events

def runapp(config):
    if myapp.common.events is None: 
       myapp.common.events = myapp.events.Event(config)
    logging.debug("MYAPP.COMMON.EVENTS: %s", myapp.common.events)
    http_server = tornado.httpserver.HTTPServer(app,
                                            xheaders=config['HTTPServer']['xheaders'],
                                            no_keep_alive=config['HTTPServer']['no_keep_alive'])
    http_server.listen(port) 
    main_loop = tornado.ioloop.IOLoop.instance()
    logging.debug("MAIN IOLOOP: %s", main_loop)
    main_loop.start()

Frohes neues Jahr :-D

0voto

Marcelo Cantos Punkte 173498

Jemand hat berichtet, dass es gelungen ist, Tornado und Pika zusammenzuführen aquí . Soweit ich das beurteilen kann, ist es nicht so einfach, Pika von Tornado aus aufzurufen, da beide Bibliotheken ihre eigenen Ereignisschleifen haben wollen.

0 Stimmen

Ja, ich verwende eine Variante von Pika, die speziell Tornado unterstützt. Ich denke, ich habe alles in Ordnung gebracht. Ich werde später eine Lösung posten, wenn ich sicher bin.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X