Python 併發總結,多線程,多進程,異步IO

1 測量函數運行時間

import time
def profile(func):
    def wrapper(*args, **kwargs):
        import time
        start = time.time()
        func(*args, **kwargs)
        end   = time.time()
        print 'COST: {}'.format(end - start)
    return wrapper
 
@profile
def fib(n):
    if n<= 2:
        return 1
    return fib(n-1) + fib(n-2)
 
fib(35)
 

 

2 啟動多個線程,並等待完成   2.1 使用threading.enumerate()

import threading
for i in range(2):
    t = threading.Thread(target=fib, args=(35,))
    t.start()
main_thread = threading.currentThread()
 
for t in threading.enumerate():
    if t is main_thread:
        continue
    t.join()

 

2.2 先保存啟動的線程

threads = []
for i in range(5):
    t = Thread(target=foo, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()

  3 使用信號量,限制同時能有幾個線程訪問臨界區

from threading import Semaphore
import time
 
sema = Semaphore(3)
 
def foo(tid):
    with sema:
        print('{} acquire sema'.format(tid))
        wt = random() * 2
        time.sleep(wt)
        print('{} release sema'.format(tid))
 

 

4 鎖,相當於信號量為1的情況

from threading import Thread Lock
value = 0
lock = Lock()
def getlock():
    global lock
    with lock:
        new = value + 1
        time.sleep(0.001)
        value = new

 

  5 可重入鎖RLock     acquire() 可以不被阻塞的被同一個線程調用多次,release()需要和acquire()調用次數匹配才能釋放鎖 6 條件 Condition 一個線程發出信號,另一個線程等待信號 常用於生產者-消費者模型

import time
import threading
 
def consumer(cond):
    t = threading.currentThread()
    with cond:
        cond.wait()
        print("{}: Resource is available to sonsumer".format(t.name))
 
def producer(cond):
    t = threading.currentThread()
    with cond:
        print("{}: Making resource available".format(t.name))
        cond.notifyAll()
 
condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
p = threading.Thread(name='p', target=producer, args=(condition,))
 
c1.start()
c2.start()
p.start()

 

  7 事件 Event 感覺和Condition 差不多

import time
import threading
from random import randint
 
TIMEOUT = 2
 
def consumer(event, l):
    t = threading.currentThread()
    while 1:
        event_is_set = event.wait(TIMEOUT)
        if event_is_set:
            try:
                integer = l.pop()
                print '{} popped from list by {}'.format(integer, t.name)
                event.clear()  # 重置事件狀態
            except IndexError:  # 為了讓剛啟動時容錯
                pass
 
def producer(event, l):
    t = threading.currentThread()
    while 1:
        integer = randint(10, 100)
        l.append(integer)
        print '{} appended to list by {}'.format(integer, t.name)
        event.set()  # 設置事件
        time.sleep(1)
 
event = threading.Event()
l = []
 
threads = []
 
for name in ('consumer1', 'consumer2'):
    t = threading.Thread(name=name, target=consumer, args=(event, l))
    t.start()
    threads.append(t)
 
p = threading.Thread(name='producer1', target=producer, args=(event, l))
p.start()
threads.append(p)
 
 
for t in threads:
    t.join()

 

  8 線程隊列  線程隊列有task_done() 和 join() 標準庫里的例子 往隊列內放結束標誌,注意do_work阻塞可能無法結束,需要用超時

import queue
def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)
for item in source():
    q.put(item)
q.join()
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

 

  9 優先級隊列 PriorityQueue

import threading
from random import randint
from queue import PriorityQueue
 
q = PriorityQueue()
 
def double(n):
    return n * 2
 
def producer():
    count = 0
    while 1:
        if count > 5:
            break
        pri = randint(0, 100)
        print('put :{}'.format(pri))
        q.put((pri, double, pri))  # (priority, func, args)
        count += 1
 
def consumer():
    while 1:
        if q.empty():
            break
        pri, task, arg = q.get()
        print('[PRI:{}] {} * 2 = {}'.format(pri, arg, task(arg)))
        q.task_done()
        time.sleep(0.1)
 
t = threading.Thread(target=producer)
t.start()
time.sleep(1)
t = threading.Thread(target=consumer)
t.start()

 

  10 線程池 當線程執行相同的任務時用線程池 10.1 multiprocessing.pool 中的線程池

from multiprocessing.pool import ThreadPool
pool = ThreadPool(5)
pool.map(lambda x: x**2, range(5))

 

10.2 multiprocessing.dummy

from multiprocessing.dummy import Pool

 

10.3 concurrent.futures.ThreadPoolExecutor

from concurrent.futures improt ThreadPoolExecutor
from concurrent.futures import as_completed
import urllib.request
 
URLS = ['http://www.baidu.com', 'http://www.hao123.com']
 
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
 
with ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        execpt Exception as exc:
            print("%r generated an exception: %s" % (url, exc))
        else:
            print("%r page is %d bytes" % (url, len(data)))
 

 

11 啟動多進程,等待多個進程結束

import multiprocessing
jobs = []
for i in range(2):
    p = multiprocessing.Process(target=fib, args=(12,))
    p.start()
    jobs.append(p)
for p in jobs:
    p.join()
 

 

12 進程池 12.1 multiprocessing.Pool

from multiprocessing import Pool
pool = Pool(2)
pool.map(fib, [36] * 2)

 

  12.2 concurrent.futures.ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
import math
 
PRIMES = [ 112272535095293, 112582705942171]
 
def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True
 
if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print("%d is prime: %s" % (number, prime))
 

 

  13 asyncio   13.1 最基本的示例,單個任務

import asyncio
 
async def hello():
    print("Hello world!")
    await asyncio.sleep(1)
    print("Hello again")
 
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()

 

13.2 最基本的示例,多個任務

import asyncio
 
async def hello():
    print("Hello world!")
    await asyncio.sleep(1)
    print("Hello again")
 
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

 

  13.3 結合httpx 執行多個任務並接收返回結果 httpx 接口和 requests基本一致

import asyncio
import httpx
 
 
async def get_url():
    r = await httpx.get("http://www.baidu.com")
    return r.status_code
 
 
loop = asyncio.get_event_loop()
tasks = [get_url() for i in range(10)]
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
 
 
for num, result in zip(range(10), results):
    print(num, result)
 

 

   本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【精選推薦文章】

如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

想要讓你的商品在網路上成為最夯、最多人討論的話題?

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師"嚨底家"!!

您可能也會喜歡…