12 Stimmen

Tornado-Sellerie-Integrationshacks

Da niemand eine Lösung für die diese Stelle sowie die Tatsache, dass ich dringend eine Lösung brauche, hier ist meine Situation und einige abstrakte Lösungen/Ideen zur Diskussion.

Mein Stapel:

  1. Tornado
  2. Sellerie
  3. MongoDB
  4. Redis
  5. RabbitMQ

Mein Problem: Einen Weg finden, wie Tornado eine Celery-Aufgabe versenden kann (gelöst) und dann asynchron das Ergebnis sammeln kann (irgendwelche Ideen?).

Szenario 1: (Anfrage/Antwort-Hack plus Webhook)

  • Tornado empfängt eine (Benutzer-)Anfrage, speichert dann im lokalen Speicher (oder in Redis) eine { jobID : (Benutzer-)Anfrage}, um sich zu merken, wohin die Antwort weitergeleitet werden soll, und löst eine celery-Aufgabe mit jobID aus
  • Wenn celery die Aufgabe abgeschlossen hat, führt es einen Webhook an einer URL aus und teilt tornado mit, dass dieser JobID abgeschlossen ist (plus die Ergebnisse)
  • Tornado ruft die (Benutzer)anfrage ab und leitet eine Antwort an den (Benutzer) weiter

Kann das passieren? Hat es irgendeine Logik?

Szenario 2: (Tornado plus Langpolling)

  • Tornado schickt die Celery-Aufgabe los und gibt einige primäre json-Daten an den Client zurück (jQuery)
  • jQuery tut einige lange-Polling nach Erhalt der primären json, sagen wir, alle x Mikrosekunden, und Tornado Antworten nach einigen Datenbank-Flag. Wenn die Celery-Aufgabe abgeschlossen ist, wird dieses Datenbank-Flag auf True gesetzt, und die jQuery-Schleife ist beendet.

Ist das effizient?

Irgendwelche anderen Ideen/Schemata?

9voto

hymloth Punkte 6642

Meine Lösung besteht darin, von Tornado auf Sellerie umzuschalten:

class CeleryHandler(tornado.web.RequestHandlerr):

    @tornado.web.asynchronous
    def get(self):    

        task = yourCeleryTask.delay(**kwargs)

        def check_celery_task():
            if task.ready():
                self.write({'success':True} )
                self.set_header("Content-Type", "application/json")  
                self.finish()
            else:   
                tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)

        tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)

Hier ist Beitrag darüber.

8voto

Sharoon Thomas Punkte 1716

Hier ist unsere Lösung für dieses Problem. Da wir in unserer Anwendung in mehreren Handlern nach Ergebnissen suchen, haben wir die Sellerie-Lookup-Klasse zu einer Mixin-Klasse gemacht.

Dadurch wird auch der Code mit dem tornado.gen-Muster besser lesbar.

from functools import partial

class CeleryResultMixin(object):
    """
    Adds a callback function which could wait for the result asynchronously
    """
    def wait_for_result(self, task, callback):
        if task.ready():
            callback(task.result)
        else:
            # TODO: Is this going to be too demanding on the result backend ?
            # Probably there should be a timeout before each add_callback
            tornado.ioloop.IOLoop.instance().add_callback(
                partial(self.wait_for_result, task, callback)
            )

class ARemoteTaskHandler(CeleryResultMixin, tornado.web.RequestHandler):
    """Execute a task asynchronously over a celery worker.
    Wait for the result without blocking
    When the result is available send it back
    """
    @tornado.web.asynchronous
    @tornado.web.authenticated
    @tornado.gen.engine
    def post(self):
        """Test the provided Magento connection
        """
        task = expensive_task.delay(
            self.get_argument('somearg'),
        )

        result = yield tornado.gen.Task(self.wait_for_result, task)

        self.write({
            'success': True,
            'result': result.some_value
        })
        self.finish()

4voto

Eren Güven Punkte 2214

Ich bin über diese Frage gestolpert, und das wiederholte Aufrufen der Ergebnisse im Backend erschien mir nicht optimal. Also implementierte ich ein Mixin ähnlich wie Ihr Szenario 1 mit Unix-Sockets.

Es benachrichtigt Tornado, sobald die Aufgabe beendet ist (genauer gesagt, sobald die nächste Aufgabe in der Kette läuft) und trifft nur einmal auf das Ergebnis-Backend. Hier ist die Link .

3voto

Jim Horng Punkte 1497

Jetzt, https://github.com/mher/tornado-celery kommt zur Rettung...

class GenAsyncHandler(web.RequestHandler):
    @asynchronous
    @gen.coroutine
    def get(self):
        response = yield gen.Task(tasks.sleep.apply_async, args=[3])
        self.write(str(response.result))
        self.finish()

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X