非同期・並列処理
Python
asyncio・threading・multiprocessing
asyncio の基本
async/await・coroutine・イベントループ
import asyncio
# coroutine 関数(async def で定義)
async def fetch_data(url: str) -> dict:
await asyncio.sleep(1) # ノンブロッキング待機
return {'url': url, 'data': '...'}
# 単一実行
async def main():
result = await fetch_data('https://api.example.com')
print(result)
asyncio.run(main()) # イベントループを作成して実行
# 並列実行(gather)
async def main():
# 複数のコルーチンを並列実行
results = await asyncio.gather(
fetch_data('https://api.example.com/users'),
fetch_data('https://api.example.com/posts'),
fetch_data('https://api.example.com/comments'),
)
return results # [result1, result2, result3]
# エラーハンドリング
async def safe_gather():
results = await asyncio.gather(
fetch_data('url1'),
fetch_data('url2'),
return_exceptions=True, # 例外をリストの要素として返す
)
for r in results:
if isinstance(r, Exception):
print(f'エラー: {r}')
else:
print(r)
# タイムアウト
async def with_timeout():
try:
result = await asyncio.wait_for(fetch_data('url'), timeout=5.0)
except asyncio.TimeoutError:
print('タイムアウト')asyncio 応用
Task・Queue・セマフォ・aiohttp
import asyncio
# Task: バックグラウンドで実行
async def main():
# create_task でバックグラウンド実行
task = asyncio.create_task(fetch_data('url'))
# 他の処理をしている間にバックグラウンドで実行される
await do_other_things()
# 結果を待つ
result = await task
# 並列タスク管理
async def fetch_all(urls: list[str]):
tasks = [asyncio.create_task(fetch_data(url)) for url in urls]
results = await asyncio.gather(*tasks)
return results
# セマフォ(同時実行数を制限)
async def limited_fetch(urls: list[str], concurrency: int = 10):
sem = asyncio.Semaphore(concurrency)
async def fetch_one(url: str):
async with sem: # 最大 concurrency 個まで同時実行
return await fetch_data(url)
return await asyncio.gather(*[fetch_one(url) for url in urls])
# asyncio.Queue(プロデューサー・コンシューマー)
async def producer_consumer():
queue = asyncio.Queue(maxsize=10)
async def producer():
for i in range(20):
await queue.put(i) # キューが満杯なら待機
await queue.put(None) # 終了シグナル
async def consumer():
while True:
item = await queue.get()
if item is None: break
print(f'処理: {item}')
queue.task_done()
await asyncio.gather(producer(), consumer())
# aiohttp(非同期HTTP)
import aiohttp
async def async_fetch(session: aiohttp.ClientSession, url: str):
async with session.get(url) as response:
return await response.json()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [async_fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)threading / multiprocessing
I/O並列・CPU並列・concurrent.futures
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import threading
# ThreadPoolExecutor(I/Oバウンドタスク向け)
# → DBクエリ・HTTP通信・ファイルI/Oなど
def fetch_url(url: str) -> str:
import urllib.request
with urllib.request.urlopen(url) as r:
return r.read().decode()
with ThreadPoolExecutor(max_workers=10) as executor:
# map: 順序を保って結果を取得
results = list(executor.map(fetch_url, urls))
# submit: 個別に Future を取得
futures = [executor.submit(fetch_url, url) for url in urls]
results = [f.result() for f in futures]
# ProcessPoolExecutor(CPUバウンドタスク向け)
# → 計算処理・画像処理・データ変換など
def cpu_heavy(n: int) -> int:
return sum(i**2 for i in range(n))
with ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_heavy, [10**6, 10**6, 10**6]))
# GIL について
# Python の GIL(Global Interpreter Lock)により
# threading はCPUバウンドでは並列にならない
# → CPUバウンドには multiprocessing を使う
# → I/Oバウンドには threading か asyncio を使う
# threading.Lock でスレッドセーフな操作
lock = threading.Lock()
counter = 0
def increment():
global counter
with lock: # クリティカルセクション
counter += 1
# threading.Event でスレッド間の同期
event = threading.Event()
def worker():
event.wait() # イベントが set されるまで待機
print('開始!')
t = threading.Thread(target=worker)
t.start()
event.set() # ワーカーを開始させる
t.join() # ワーカーの終了を待つ