Ich öffne eine Datei, die 100.000 URLs enthält. Ich muss eine HTTP-Anfrage an jede URL senden und den Statuscode ausgeben. Ich verwende Python 2.6 und habe mir bisher die vielen verwirrenden Möglichkeiten angeschaut, wie Python Threading/Concurrency implementiert. Ich habe mir sogar die Python Übereinstimmung Bibliothek, kann aber nicht herausfinden, wie man dieses Programm richtig schreibt. Ist jemand auf ein ähnliches Problem gestoßen? Ich schätze, ich muss generell wissen, wie man Tausende von Aufgaben in Python so schnell wie möglich ausführt - ich nehme an, das bedeutet "gleichzeitig".
Antworten
Zu viele Anzeigen?Dieser verdrehte asynchrone Web-Client ist ziemlich schnell.
#!/usr/bin/python2.7
from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}
def getLock(url, simultaneous = 1):
return locks[urlparse(url).netloc, randrange(simultaneous)]
@inlineCallbacks
def getMapping(url):
# Limit ourselves to 4 simultaneous connections per host
# Tweak this number, but it should be no larger than pool.maxPersistentPerHost
lock = getLock(url,4)
yield lock.acquire()
try:
resp = yield agent.request('HEAD', url)
codes[url] = resp.code
except Exception as e:
codes[url] = str(e)
finally:
lock.release()
dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())
reactor.run()
pprint(codes)
Erstellen. epoll
Objekt,
viele Client-TCP-Sockets öffnen,
ihre Sendepuffer so anpassen, dass sie etwas größer sind als der Anfragekopf,
eine Kopfzeile der Anfrage senden - sie sollte sofort erfolgen, indem sie einfach in einen Puffer gelegt wird, Socket registrieren in epoll
Objekt,
faire .poll
auf epoll
obect,
Lesen der ersten 3 Bytes von jedem Socket aus .poll
,
schreiben sie an sys.stdout
gefolgt von \n
(nicht spülen), schließen Sie den Client-Socket.
Begrenzung der Anzahl der gleichzeitig geöffneten Sockets - Behandlung von Fehlern bei der Erstellung von Sockets. Erstellen Sie einen neuen Socket nur, wenn ein anderer geschlossen ist.
OS-Grenzwerte anpassen.
Versuchen Sie, sich in einige (nicht viele) Prozesse aufzuteilen: Dies kann helfen, die CPU etwas effektiver zu nutzen.
In Ihrem Fall ist Threading wahrscheinlich der richtige Weg, da Sie wahrscheinlich die meiste Zeit damit verbringen, auf eine Antwort zu warten. Es gibt hilfreiche Module wie Warteschlange in der Standardbibliothek, die helfen könnten.
Ich habe schon einmal etwas Ähnliches mit dem parallelen Herunterladen von Dateien gemacht, und es war gut genug für mich, aber es war nicht in dem Ausmaß, von dem Sie sprechen.
Wenn Ihre Aufgabe mehr CPU-gebunden war, sollten Sie sich die Multiprozessorbetrieb Modul, das es Ihnen ermöglicht, mehr CPUs/Kerne/Threads zu nutzen (mehr Prozesse, die sich nicht gegenseitig blockieren, da die Sperrung pro Prozess erfolgt)
Ich habe festgestellt, dass die Verwendung des tornado
Paket ist der schnellste und einfachste Weg, dies zu erreichen:
from tornado import ioloop, httpclient, gen
def main(urls):
"""
Asynchronously download the HTML contents of a list of URLs.
:param urls: A list of URLs to download.
:return: List of response objects, one for each URL.
"""
@gen.coroutine
def fetch_and_handle():
httpclient.AsyncHTTPClient.configure(None, defaults=dict(user_agent='MyUserAgent'))
http_client = httpclient.AsyncHTTPClient()
waiter = gen.WaitIterator(*[http_client.fetch(url, raise_error=False, method='HEAD')
for url in urls])
results = []
# Wait for the jobs to complete
while not waiter.done():
try:
response = yield waiter.next()
except httpclient.HTTPError as e:
print(f'Non-200 HTTP response returned: {e}')
continue
except Exception as e:
print(f'An unexpected error occurred querying: {e}')
continue
else:
print(f'URL \'{response.request.url}\' has status code <{response.code}>')
results.append(response)
return results
loop = ioloop.IOLoop.current()
web_pages = loop.run_sync(fetch_and_handle)
return web_pages
my_urls = ['url1.com', 'url2.com', 'url100000.com']
responses = main(my_urls)
print(responses[0])
Mit einer Thread-Pool ist eine gute Option, mit der sich dies recht einfach bewerkstelligen lässt. Leider gibt es in Python keine Standardbibliothek, die Thread-Pools extrem einfach macht. Aber hier ist eine anständige Bibliothek, die Sie beginnen sollte: http://www.chrisarndt.de/projects/threadpool/
Code-Beispiel von deren Website:
pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()
Ich hoffe, das hilft.