python高性能代碼之多線程優化
以常見的端口掃描器為實例端口掃描器的原理很簡單,操作socket來判斷連接狀態確定主機端口的開放情況。
import socket
def scan(port):
s = socket.socket()
if s.connect_ex(('localhost', port)) == 0:
print port, 'open'
s.close()
if __name__ == '__main__':
map(scan,range(1,65536))
這是一個socket掃描器的基本代碼。
但是如果直接運行會等待很長時間都沒有反應,這是因為socket是阻塞的,到等待每個連接超時后才會進入下一個連接。
給這段代碼加一個超時
s.settimeout(0.1)
完整的代碼如下
import socket
def scan(port):
s = socket.socket()
s = settimeont(0.1)
if s.connect_ex(('localhost', port)) == 0:
print port, 'open'
s.close()
if __name__ == '__main__':
map(scan,range(1,65536))
本文的重點不在于掃描器功能部分。而重點在于代碼質量的提升和優化從而提升代碼的運行效率。
多線程版本:
import socket
import threading
def scan(port):
s = socket.socket()
s.settimeout(0.1)
if s.connect_ex(('localhost', port)) == 0:
print port, 'open'
s.close()
if __name__ == '__main__':
threads = [threading.Thread(target=scan, args=(i,)) for i in xrange(1,65536)]
map(lambda x:x.start(),threads)
Run起來,速度確實快了不少,但是拋出了異常:thread.error: can't start new thread
這個進程開啟了65535個線程,有兩種可能,一種是超過最大線程數了,一種是超過最大socket句柄數了。在linux可以通過ulimit來修改。
如果不修改最大限制,怎么用多線程不報錯呢?
加個queue,變成生產者-消費者模式,開固定線程。
多線程+隊列版本:
import socket
import threading
from Queue import Queue
def scan(port):
s = socket.socket()
s.settimeout(0.1)
if s.connect_ex(('localhost', port)) == 0:
print port, 'open'
s.close()
def worker():
while not q.empty():
port = q.get()
try:
scan(port)
finally:
q.task_done()
if __name__ == '__main__':
q = Queue()
map(q.put,xrange(1,65535))
threads = [threading.Thread(target=worker) for i in xrange(500)]
map(lambda x:x.start(),threads)
q.join()
開500個線程,不停的從隊列中取出任務來進行...
multiprocessing + 隊列版本:
總不能開65535個進程吧?還是用生產者消費者模式
import multiprocessing
def scan(port):
s = socket.socket()
s.settimeout(0.1)
if s.connect_ex(('localhost', port)) == 0:
print port, 'open'
s.close()
def worker(q):
while not q.empty():
port = q.get()
try:
scan(port)
finally:
q.task_done()
if __name__ == '__main__':
q = multiprocessing.JoinableQueue()
map(q.put,xrange(1,65535))
jobs = [multiprocessing.Process(target=worker, args=(q,)) for i in xrange(100)]
map(lambda x:x.start(),jobs)
注意這里把隊列作為一個參數傳入到worker中去,因為是process safe的queue,不然會報錯。
還有用的是JoinableQueue(),顧名思義就是可以join()的。
gevent的spawn版本:
from gevent import monkey; monkey.patch_all();
import gevent
import socket
...
if __name__ == '__main__':
threads = [gevent.spawn(scan, i) for i in xrange(1,65536)]
gevent.joinall(threads)
注意monkey patch必須在被patch的東西之前import,不然會Exception KeyError.比如不能先import threading,再monkey patch.
gevent的Pool版本:
from gevent import monkey; monkey.patch_all();
import socket
from gevent.pool import Pool
...
if __name__ == '__main__':
pool = Pool(500)
pool.map(scan,xrange(1,65536))
pool.join()
concurrent.futures版本:
import socket
from Queue import Queue
from concurrent.futures import ThreadPoolExecutor
...
if __name__ == '__main__':
q = Queue()
map(q.put,xrange(1,65536))
with ThreadPoolExecutor(max_workers=500) as executor:
for i in range(500):
executor.submit(worker,q)
來自:http://www.cnblogs.com/lfoder/p/5883143.html
本文由用戶 gpob3582 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!