Python 併發總結,多線程,多進程,異步IO
1 測量函數運行時間
import time def profile(func): def wrapper(*args, **kwargs): import time start = time.time() func(*args, **kwargs) end = time.time() print 'COST: {}'.format(end - start) return wrapper @profile def fib(n): if n<= 2: return 1 return fib(n-1) + fib(n-2) fib(35)
2 啟動多個線程,並等待完成 2.1 使用threading.enumerate()
import threading for i in range(2): t = threading.Thread(target=fib, args=(35,)) t.start() main_thread = threading.currentThread() for t in threading.enumerate(): if t is main_thread: continue t.join()
2.2 先保存啟動的線程
threads = [] for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
3 使用信號量,限制同時能有幾個線程訪問臨界區
from threading import Semaphore import time sema = Semaphore(3) def foo(tid): with sema: print('{} acquire sema'.format(tid)) wt = random() * 2 time.sleep(wt) print('{} release sema'.format(tid))
4 鎖,相當於信號量為1的情況
from threading import Thread Lock value = 0 lock = Lock() def getlock(): global lock with lock: new = value + 1 time.sleep(0.001) value = new
5 可重入鎖RLock acquire() 可以不被阻塞的被同一個線程調用多次,release()需要和acquire()調用次數匹配才能釋放鎖 6 條件 Condition 一個線程發出信號,另一個線程等待信號 常用於生產者-消費者模型
import time import threading def consumer(cond): t = threading.currentThread() with cond: cond.wait() print("{}: Resource is available to sonsumer".format(t.name)) def producer(cond): t = threading.currentThread() with cond: print("{}: Making resource available".format(t.name)) cond.notifyAll() condition = threading.Condition() c1 = threading.Thread(name='c1', target=consumer, args=(condition,)) c2 = threading.Thread(name='c2', target=consumer, args=(condition,)) p = threading.Thread(name='p', target=producer, args=(condition,)) c1.start() c2.start() p.start()
7 事件 Event 感覺和Condition 差不多
import time import threading from random import randint TIMEOUT = 2 def consumer(event, l): t = threading.currentThread() while 1: event_is_set = event.wait(TIMEOUT) if event_is_set: try: integer = l.pop() print '{} popped from list by {}'.format(integer, t.name) event.clear() # 重置事件狀態 except IndexError: # 為了讓剛啟動時容錯 pass def producer(event, l): t = threading.currentThread() while 1: integer = randint(10, 100) l.append(integer) print '{} appended to list by {}'.format(integer, t.name) event.set() # 設置事件 time.sleep(1) event = threading.Event() l = [] threads = [] for name in ('consumer1', 'consumer2'): t = threading.Thread(name=name, target=consumer, args=(event, l)) t.start() threads.append(t) p = threading.Thread(name='producer1', target=producer, args=(event, l)) p.start() threads.append(p) for t in threads: t.join()
8 線程隊列 線程隊列有task_done() 和 join() 標準庫里的例子 往隊列內放結束標誌,注意do_work阻塞可能無法結束,需要用超時
import queue def worker(): while True: item = q.get() if item is None: break do_work(item) q.task_done() q = queue.Queue() threads = [] for i in range(num_worker_threads): t = threading.Thread(target=worker) t.start() threads.append(t) for item in source(): q.put(item) q.join() for i in range(num_worker_threads): q.put(None) for t in threads: t.join()
9 優先級隊列 PriorityQueue
import threading from random import randint from queue import PriorityQueue q = PriorityQueue() def double(n): return n * 2 def producer(): count = 0 while 1: if count > 5: break pri = randint(0, 100) print('put :{}'.format(pri)) q.put((pri, double, pri)) # (priority, func, args) count += 1 def consumer(): while 1: if q.empty(): break pri, task, arg = q.get() print('[PRI:{}] {} * 2 = {}'.format(pri, arg, task(arg))) q.task_done() time.sleep(0.1) t = threading.Thread(target=producer) t.start() time.sleep(1) t = threading.Thread(target=consumer) t.start()
10 線程池 當線程執行相同的任務時用線程池 10.1 multiprocessing.pool 中的線程池
from multiprocessing.pool import ThreadPool pool = ThreadPool(5) pool.map(lambda x: x**2, range(5))
10.2 multiprocessing.dummy
from multiprocessing.dummy import Pool
10.3 concurrent.futures.ThreadPoolExecutor
from concurrent.futures improt ThreadPoolExecutor from concurrent.futures import as_completed import urllib.request URLS = ['http://www.baidu.com', 'http://www.hao123.com'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() with ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in as_completed(future_to_url): url = future_to_url[future] try: data = future.result() execpt Exception as exc: print("%r generated an exception: %s" % (url, exc)) else: print("%r page is %d bytes" % (url, len(data)))
11 啟動多進程,等待多個進程結束
import multiprocessing jobs = [] for i in range(2): p = multiprocessing.Process(target=fib, args=(12,)) p.start() jobs.append(p) for p in jobs: p.join()
12 進程池 12.1 multiprocessing.Pool
from multiprocessing import Pool pool = Pool(2) pool.map(fib, [36] * 2)
12.2 concurrent.futures.ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor import math PRIMES = [ 112272535095293, 112582705942171] def is_prime(n): if n < 2: return False if n == 2: return True if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True if __name__ == "__main__": with ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print("%d is prime: %s" % (number, prime))
13 asyncio 13.1 最基本的示例,單個任務
import asyncio async def hello(): print("Hello world!") await asyncio.sleep(1) print("Hello again") loop = asyncio.get_event_loop() loop.run_until_complete(hello()) loop.close()
13.2 最基本的示例,多個任務
import asyncio async def hello(): print("Hello world!") await asyncio.sleep(1) print("Hello again") loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
13.3 結合httpx 執行多個任務並接收返回結果 httpx 接口和 requests基本一致
import asyncio import httpx async def get_url(): r = await httpx.get("http://www.baidu.com") return r.status_code loop = asyncio.get_event_loop() tasks = [get_url() for i in range(10)] results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close() for num, result in zip(range(10), results): print(num, result)
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【精選推薦文章】
如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!
想要讓你的商品在網路上成為最夯、最多人討論的話題?
網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線
不管是台北網頁設計公司、台中網頁設計公司,全省皆有專員為您服務
想知道最厲害的台北網頁設計公司推薦、台中網頁設計公司推薦專業設計師"嚨底家"!!