386 Stimmen

Was ist der schnellste Weg, 100.000 HTTP-Anfragen in Python zu senden?

Ich öffne eine Datei, die 100.000 URLs enthält. Ich muss eine HTTP-Anfrage an jede URL senden und den Statuscode ausgeben. Ich verwende Python 2.6 und habe mir bisher die vielen verwirrenden Möglichkeiten angeschaut, wie Python Threading/Concurrency implementiert. Ich habe mir sogar die Python Übereinstimmung Bibliothek, kann aber nicht herausfinden, wie man dieses Programm richtig schreibt. Ist jemand auf ein ähnliches Problem gestoßen? Ich schätze, ich muss generell wissen, wie man Tausende von Aufgaben in Python so schnell wie möglich ausführt - ich nehme an, das bedeutet "gleichzeitig".

26voto

使用する Nachforschungen Es ist eine Kombination aus Anfragen + Gevent-Modul.

GRequests ermöglicht es Ihnen, Requests mit Gevent zu verwenden, um asynchrone HTTP-Requests einfach durchzuführen.

Die Verwendung ist einfach:

import grequests

urls = [
   'http://www.heroku.com',
   'http://tablib.org',
   'http://httpbin.org',
   'http://python-requests.org',
   'http://kennethreitz.com'
]

Erstellen Sie einen Satz von nicht gesendeten Anfragen:

>>> rs = (grequests.get(u) for u in urls)

Senden Sie sie alle gleichzeitig:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

26voto

user5994461 Punkte 3652

(Notiz an mich selbst für das nächste Projekt)

Python 3-Lösung nur mit requests . Es ist die einfachste und schnellste Lösung, ohne Multiprocessing oder komplizierte asynchrone Bibliotheken.

Der wichtigste Aspekt ist die Wiederverwendung von Verbindungen, insbesondere für HTTPS (TLS erfordert einen zusätzlichen Hin- und Rückweg zum Öffnen). Beachten Sie, dass eine Verbindung spezifisch für eine Subdomain ist. Wenn Sie viele Seiten auf vielen Domains scrapen, können Sie die Liste der URLs sortieren, um die Wiederverwendung von Verbindungen zu maximieren (es wird effektiv nach Domain sortiert).

Er wird so schnell wie jeder asynchrone Code sein, wenn genügend Threads vorhanden sind. (Anfragen geben die Python-GIL frei, wenn auf die Antwort gewartet wird).

[Produktionsfähiger Code mit etwas Protokollierung und Fehlerbehandlung]

import logging
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

# source: https://stackoverflow.com/a/68583332/5994461

THREAD_POOL = 16

# This is how to create a reusable connection pool with python requests.
session = requests.Session()
session.mount(
    'https://',
    requests.adapters.HTTPAdapter(pool_maxsize=THREAD_POOL,
                                  max_retries=3,
                                  pool_block=True)
)

def get(url):
    response = session.get(url)
    logging.info("request was completed in %s seconds [%s]", response.elapsed.total_seconds(), response.url)
    if response.status_code != 200:
        logging.error("request failed, error code %s [%s]", response.status_code, response.url)
    if 500 <= response.status_code < 600:
        # server is overloaded? give it a break
        time.sleep(5)
    return response

def download(urls):
    with ThreadPoolExecutor(max_workers=THREAD_POOL) as executor:
        # wrap in a list() to wait for all requests to complete
        for response in list(executor.map(get, urls)):
            if response.status_code == 200:
                print(response.content)

def main():
    logging.basicConfig(
        format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
        level=logging.INFO,
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    urls = [
        "https://httpstat.us/200",
        "https://httpstat.us/200",
        "https://httpstat.us/200",
        "https://httpstat.us/404",
        "https://httpstat.us/503"
    ]

    download(urls)

if __name__ == "__main__":
    main()

11voto

Erik Garrison Punkte 1647

Ein guter Ansatz zur Lösung dieses Problems besteht darin, zunächst den Code zu schreiben, der erforderlich ist, um ein Ergebnis zu erhalten, und dann Threading-Code einzubauen, um die Anwendung zu parallelisieren.

In einer perfekten Welt würde dies einfach bedeuten, dass gleichzeitig 100.000 Threads gestartet werden, die ihre Ergebnisse in ein Wörterbuch oder eine Liste für die spätere Verarbeitung ausgeben, aber in der Praxis ist die Anzahl der parallelen HTTP-Anfragen, die Sie auf diese Weise stellen können, begrenzt. Auf lokaler Ebene gibt es Grenzen, wie viele Sockets Sie gleichzeitig öffnen können und wie viele Threads Ihr Python-Interpreter zulässt. Aus der Ferne kann die Anzahl der gleichzeitigen Verbindungen begrenzt sein, wenn alle Anfragen an einen oder mehrere Server gerichtet sind. Diese Beschränkungen machen es wahrscheinlich erforderlich, dass Sie das Skript so schreiben, dass nur ein kleiner Teil der URLs gleichzeitig abgefragt wird (100, wie ein anderer Poster erwähnte, ist wahrscheinlich eine angemessene Größe des Thread-Pools, obwohl Sie vielleicht feststellen, dass Sie viel mehr erfolgreich einsetzen können).

Sie können dieses Entwurfsmuster befolgen, um das obige Problem zu lösen:

  1. Starten Sie einen Thread, der neue Anfrage-Threads startet, bis die Anzahl der derzeit laufenden Threads (Sie können sie über threading.active_count() verfolgen oder indem Sie die Thread-Objekte in eine Datenstruktur schieben) >= Ihre maximale Anzahl gleichzeitiger Anfragen (z. B. 100) ist, und dann für eine kurze Zeitspanne schläft. Dieser Thread sollte beendet werden, wenn es keine URLs mehr zu verarbeiten gibt. Der Thread wird also immer wieder aufwachen, neue Threads starten und schlafen, bis Sie fertig sind.
  2. Die Abfrage-Threads sollen ihre Ergebnisse in einer Datenstruktur speichern, damit sie später abgerufen und ausgegeben werden können. Wenn es sich bei der Struktur, in der Sie die Ergebnisse speichern, um eine list ou dict in CPython, können Sie sicheres Anhängen oder Einfügen einzigartiger Elemente aus Ihren Threads ohne Sperren aber wenn Sie in eine Datei schreiben oder eine komplexere, Thread-übergreifende Dateninteraktion benötigen sollten Sie eine gegenseitige Ausschlusssperre verwenden, um diesen Zustand vor Beschädigung zu schützen .

Ich würde vorschlagen, dass Sie die Einfädeln Modul. Sie können es verwenden, um laufende Threads zu starten und zu verfolgen. Pythons Threading-Unterstützung ist dürftig, aber die Beschreibung Ihres Problems lässt vermuten, dass sie für Ihre Bedürfnisse völlig ausreichend ist.

Wenn Sie schließlich eine ziemlich einfache Anwendung einer in Python geschriebenen parallelen Netzwerkanwendung sehen möchten, sehen Sie sich ssh.py . Es ist eine kleine Bibliothek, die Python-Threading verwendet, um viele SSH-Verbindungen zu parallelisieren. Das Design ist nahe genug an Ihren Anforderungen, so dass es eine gute Ressource für Sie sein könnte.

9voto

Rakis Punkte 7629

Wenn Sie die bestmögliche Leistung erzielen wollen, sollten Sie die Verwendung von asynchroner E/A anstelle von Threads in Betracht ziehen. Der Overhead, der mit Tausenden von Betriebssystem-Threads verbunden ist, ist nicht trivial und die Kontextumschaltung innerhalb des Python-Interpreters trägt noch mehr dazu bei. Mit Threads lässt sich die Aufgabe sicherlich bewältigen, aber ich vermute, dass ein asynchroner Weg eine bessere Gesamtleistung erbringen wird.

Insbesondere würde ich den asynchronen Web-Client in der Twisted-Bibliothek empfehlen ( http://www.twistedmatrix.com ). Es hat eine zugegebenermaßen steile Lernkurve, aber es ist recht einfach zu bedienen, sobald man den Stil der asynchronen Programmierung von Twisted gut im Griff hat.

Ein HowTo zur asynchronen Web-Client-API von Twisted ist verfügbar unter:

http://twistedmatrix.com/documents/current/web/howto/client.html

7voto

Tarnay Kálmán Punkte 6772

Eine Lösung:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools

concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

Testzeit:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X