8 Stimmen

Python's asyncore, um Daten periodisch mit einem variablen Timeout zu senden. Gibt es einen besseren Weg?

Ich wollte einen Server schreiben, zu dem ein Client eine Verbindung herstellen und regelmäßige Updates ohne Polling erhalten konnte. Das Problem, das ich mit Asyncore hatte, ist, dass Sie, wenn Sie keinen true-Wert zurückgeben, wenn dispatcher.writable() aufgerufen wird, warten müssen, bis der Asyncore-Loop abgelaufen ist (Standardwert ist 30s).

Die beiden Möglichkeiten, die ich versucht habe, dieses Problem zu umgehen, sind 1) die Timeout-Zeit auf einen niedrigen Wert zu reduzieren oder 2) Verbindungen abzufragen, wann sie als nächstes aktualisiert werden, und einen angemessenen Timeout-Wert zu generieren. Wenn Sie sich jedoch auf das 'Select Law' in 'man 2 select_tut' beziehen, heißt es: "Sie sollten immer versuchen, select() ohne Timeout zu verwenden."

Gibt es einen besseren Weg, dies zu tun? Vielleicht Twisted? Ich wollte versuchen, zusätzliche Threads zu vermeiden. Ich werde hier das Beispiel mit variablem Timeout einbeziehen:

#!/usr/bin/python

import time
import socket
import asyncore

# in Sekunden
UPDATE_PERIOD = 4.0

class Channel(asyncore.dispatcher):

    def __init__(self, sock, sck_map):
        asyncore.dispatcher.__init__(self, sock=sock, map=sck_map)
        self.last_update = 0.0  # sollte sofort aktualisiert werden
        self.send_buf = ''
        self.recv_buf = ''

    def writable(self):
        return len(self.send_buf) > 0

    def handle_write(self):
        nbytes = self.send(self.send_buf)
        self.send_buf = self.send_buf[nbytes:]

    def handle_read(self):
        print 'lesen'
        print 'empfangen:', self.recv(4096)

    def handle_close(self):
        print 'schließen'
        self.close()

    # für variable Timeout hinzugefügt
    def update(self):
        if time.time() >= self.next_update():
            self.send_buf += 'hallo %f\n'%(time.time())
            self.last_update = time.time()

    def next_update(self):
        return self.last_update + UPDATE_PERIOD

class Server(asyncore.dispatcher):

    def __init__(self, port, sck_map):
        asyncore.dispatcher.__init__(self, map=sck_map)
        self.port = port
        self.sck_map = sck_map
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind(("", port))
        self.listen(16)
        print "hören auf Port", self.port

    def handle_accept(self):
        (conn, addr) = self.accept()
        Channel(sock=conn, sck_map=self.sck_map)

    # für variable Timeout hinzugefügt
    def update(self):
        pass

    def next_update(self):
        return None

sck_map = {}

server = Server(9090, sck_map)
while True:
    next_update = time.time() + 30.0
    for c in sck_map.values():
        c.update()  # <-- Schreibpuffer füllen
        n = c.next_update()
        #print 'n:',n
        if n is not None:
            next_update = min(next_update, n)
    _timeout = max(0.1, next_update - time.time())

    asyncore.loop(timeout=_timeout, count=1, map=sck_map)

4voto

Martin v. Löwis Punkte 120025

Das "select law" gilt nicht für Ihren Fall, da Sie nicht nur client-ausgelöste (reine Server-) Aktivitäten haben, sondern auch zeitgesteuerte Aktivitäten - dafür ist genau der auswählen Timeout da. Was das Gesetz wirklich sagen sollte, ist "wenn Sie ein Timeout festlegen, stellen Sie sicher, dass Sie tatsächlich etwas Nützliches tun müssen, wenn das Timeout eintritt". Das Gesetz soll gegen busy-waiting schützen; Ihr Code macht kein busy-wait.

Ich würde _timeout nicht auf das Maximum von 0,1 und der nächsten Aktualisierungszeit setzen, sondern auf das Maximum von 0,0 und dem nächsten Timeout. Mit anderen Worten, wenn sich ein Aktualisierungsintervall abgelaufen ist, während Sie Updates gemacht haben, sollten Sie dieses spezifische Update sofort durchführen.

Anstatt jedes Mal jeden Kanal zu fragen, ob er aktualisiert werden soll, könnten Sie alle Kanäle in einer Prioritätswarteschlange (sortiert nach der nächsten Aktualisierungszeit) speichern und dann nur Updates für die frühesten Kanäle ausführen (bis Sie einen finden, dessen Aktualisierungszeit noch nicht erreicht ist). Dafür können Sie das heapq-Modul verwenden.

Sie können auch ein paar Systemaufrufe sparen, indem Sie jeden Kanal nicht jedes Mal nach der aktuellen Zeit fragen lassen, sondern die aktuelle Zeit nur einmal abfragen und sie an .update übergeben.

4voto

demiurgus Punkte 41

Vielleicht können Sie dies mit sched.scheduler tun, wie hier (n.b. nicht getestet):

import sched, asyncore, time

# Erstellen eines Schedulers mit einer Verzögerungsfunktion, die asyncore.loop aufruft
scheduler = sched.scheduler(time.time, lambda t: _poll_loop(t, time.time()) )

# Fügen Sie die Update-Timeouts mit scheduler.enter hinzu
# ...

def _poll_loop(timeout, start_time):  
  asyncore.loop(timeout, count=1)
  finish_time = time.time()
  timeleft = finish_time - start_time
  if timeleft > timeout:  # es gab eine Nachricht und die Timeout-Verzögerung ist noch nicht abgelaufen
    _poll_loop(timeleft, finish_time) # daher noch etwas warten und den Socket abfragen

def main_loop():
  while True:
    if scheduler.empty():
      asyncore.loop(30.0, count=1) # einfach Standard-Timeout, verwenden Sie, was Ihnen passt
      # fügen Sie hier andere Arbeiten hinzu, die möglicherweise geplante Ereignisse erstellen
    else:
      scheduler.run()

2voto

Helmut Grohne Punkte 5722

Dies ist im Grunde genommen die Lösung von demiurgus, bei der die groben Kanten abgerundet wurden. Es behält seine Grundidee bei, verhindert jedoch Laufzeitfehler und Endlosschleifen und ist getestet. [Bearbeitung: behobene Probleme beim Ändern des Schedulers während der Verzögerung]

class asynschedcore(sched.scheduler):
    """Kombiniere sched.scheduler und asyncore.loop."""
    # Beim Empfang eines Signals startet asyncore freundlicherweise select neu. Allerdings könnte der Signalhandler die Scheduler-Instanz ändern. Dieser Wert bestimmt die maximale Zeit in Sekunden, die in asynschedcore.loop verbracht werden soll, bevor der Scheduler erneut überprüft wird.
    maxloop = 30
    def __init__(self, map=None):
        sched.scheduler.__init__(self, time.time, self._delay)
        if map is None:
            self._asynmap = asyncore.socket_map
        else:
            self._asynmap = map
        self._abort_delay = False

    def _maybe_abort_delay(self):
        if not self._abort_delay:
            return False
        # Das Zurückkehren aus dieser Funktion führt dazu, dass das nächste Ereignis ausgeführt wird, daher könnte es zu früh ausgeführt werden. Dies kann vermieden werden, indem der Anfang der Warteschlange geändert wird. Beachten Sie auch, dass enterabs _abort_delay auf True setzt.
        self.enterabs(0, 0, lambda:None, ())
        self._abort_delay = False
        return True

    def _delay(self, timeout):
        if self._maybe_abort_delay():
            return
        if 0 == timeout:
            # Sollten wir auch diesen Hack unterstützen?
            # asyncore.loop(0, map=self._asynmap, count=1)
            return
        now = time.time()
        finish = now + timeout
        while now < finish and self._asynmap:
            asyncore.loop(min(finish - now, self.maxloop), map=self._asynmap,
                          count=1)
            if self._maybe_abort_delay():
                return
            now = time.time()
        if now < finish:
            time.sleep(finish - now)

    def enterabs(self, abstime, priority, action, argument):
        # Wir könnten ein Ereignis vor dem aktuell nächsten Ereignis einfügen.
        self._abort_delay = True
        return sched.scheduler.enterabs(self, abstime, priority, action,
                                        argument)

    # Es ist nicht notwendig, enter zu überschreiben, da es mit enter implementiert wurde.

    def cancel(self, event):
        # Wir könnten das nächste Ereignis abbrechen.
        self._abort_delay = True
        return sched.scheduler.cancel(self, event)

    def run(self):
        """Wird ausgeführt, solange entweder ein Ereignis geplant ist oder sich Sockets im Map befinden."""
        while True:
            if not self.empty():
                sched.scheduler.run(self)
            elif self._asynmap:
                asyncore.loop(self.maxloop, map=self._asynmap, count=1)
            else:
                break

1voto

mthurlin Punkte 24849

Ich würde Twisted verwenden, es ist schon lange her, seit ich asyncore benutzt habe, aber ich denke, das sollte das Twisted-Äquivalent sein (nicht getestet, aus dem Gedächtnis geschrieben):

from twisted.internet import reactor, protocol
import time

UPDATE_PERIOD = 4.0

class MyClient(protocol.Protocol):

    def connectionMade(self):
        self.updateCall = reactor.callLater(UPDATE_PERIOD, self.update)

    def connectionLost(self, reason):
        self.updateCall.cancel()

    def update(self):
        self.transport.write("hello %f\n" % (time.time(),))

    def dataReceived(self, data):
        print "recv:", data

f = protocol.ServerFactory()
f.protocol = MyClient

reactor.listenTCP(9090, f)
reactor.run()

0voto

Demolishun Punkte 1552

Vielleicht verstehe ich nicht, was der OP erreichen wollte, aber ich habe das Problem einfach gelöst, indem ich einen Thread verwendet habe, der eine schwache Referenz von jedem Channel (asyncore.dispatcher) Objekt erhält. Dieser Thread bestimmt sein eigenes Timing und sendet dem Channel periodisch ein Update, indem er eine Queue in diesem Channel verwendet. Er erhält die Queue vom Channel-Objekt, indem er getQueue aufruft.

Der Grund, warum ich eine schwache Referenz verwende, ist, weil Clients flüchtig sind. Wenn der Channel stirbt, gibt die schwache Referenz None zurück. Auf diese Weise hält der Timing-Thread keine alten Objekte am Leben, weil er auf diese referenziert.

Ich weiß, dass der OP Threads vermeiden wollte, aber diese Lösung ist sehr einfach. Es erstellt nur einen Thread und spricht mit allen Channels, die vom Server-Objekt hinzugefügt werden, um sie zur Liste der zu überwachenden Objekte hinzuzufügen.

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X