Python

非同期・並列処理

Python

asyncio・threading・multiprocessing

asyncio の基本

async/await・coroutine・イベントループ

asyncio_basics.py python
import asyncio

# coroutine 関数(async def で定義)
async def fetch_data(url: str) -> dict:
    await asyncio.sleep(1)  # ノンブロッキング待機
    return {'url': url, 'data': '...'}

# 単一実行
async def main():
    result = await fetch_data('https://api.example.com')
    print(result)

asyncio.run(main())  # イベントループを作成して実行

# 並列実行(gather)
async def main():
    # 複数のコルーチンを並列実行
    results = await asyncio.gather(
        fetch_data('https://api.example.com/users'),
        fetch_data('https://api.example.com/posts'),
        fetch_data('https://api.example.com/comments'),
    )
    return results  # [result1, result2, result3]

# エラーハンドリング
async def safe_gather():
    results = await asyncio.gather(
        fetch_data('url1'),
        fetch_data('url2'),
        return_exceptions=True,  # 例外をリストの要素として返す
    )
    for r in results:
        if isinstance(r, Exception):
            print(f'エラー: {r}')
        else:
            print(r)

# タイムアウト
async def with_timeout():
    try:
        result = await asyncio.wait_for(fetch_data('url'), timeout=5.0)
    except asyncio.TimeoutError:
        print('タイムアウト')

asyncio 応用

Task・Queue・セマフォ・aiohttp

asyncio_advanced.py python
import asyncio

# Task: バックグラウンドで実行
async def main():
    # create_task でバックグラウンド実行
    task = asyncio.create_task(fetch_data('url'))

    # 他の処理をしている間にバックグラウンドで実行される
    await do_other_things()

    # 結果を待つ
    result = await task

# 並列タスク管理
async def fetch_all(urls: list[str]):
    tasks = [asyncio.create_task(fetch_data(url)) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

# セマフォ(同時実行数を制限)
async def limited_fetch(urls: list[str], concurrency: int = 10):
    sem = asyncio.Semaphore(concurrency)

    async def fetch_one(url: str):
        async with sem:  # 最大 concurrency 個まで同時実行
            return await fetch_data(url)

    return await asyncio.gather(*[fetch_one(url) for url in urls])

# asyncio.Queue(プロデューサー・コンシューマー)
async def producer_consumer():
    queue = asyncio.Queue(maxsize=10)

    async def producer():
        for i in range(20):
            await queue.put(i)  # キューが満杯なら待機
        await queue.put(None)   # 終了シグナル

    async def consumer():
        while True:
            item = await queue.get()
            if item is None: break
            print(f'処理: {item}')
            queue.task_done()

    await asyncio.gather(producer(), consumer())

# aiohttp(非同期HTTP)
import aiohttp

async def async_fetch(session: aiohttp.ClientSession, url: str):
    async with session.get(url) as response:
        return await response.json()

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [async_fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

threading / multiprocessing

I/O並列・CPU並列・concurrent.futures

parallel.py python
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import threading

# ThreadPoolExecutor(I/Oバウンドタスク向け)
# → DBクエリ・HTTP通信・ファイルI/Oなど
def fetch_url(url: str) -> str:
    import urllib.request
    with urllib.request.urlopen(url) as r:
        return r.read().decode()

with ThreadPoolExecutor(max_workers=10) as executor:
    # map: 順序を保って結果を取得
    results = list(executor.map(fetch_url, urls))

    # submit: 個別に Future を取得
    futures = [executor.submit(fetch_url, url) for url in urls]
    results = [f.result() for f in futures]

# ProcessPoolExecutor(CPUバウンドタスク向け)
# → 計算処理・画像処理・データ変換など
def cpu_heavy(n: int) -> int:
    return sum(i**2 for i in range(n))

with ProcessPoolExecutor() as executor:
    results = list(executor.map(cpu_heavy, [10**6, 10**6, 10**6]))

# GIL について
# Python の GIL(Global Interpreter Lock)により
# threading はCPUバウンドでは並列にならない
# → CPUバウンドには multiprocessing を使う
# → I/Oバウンドには threading か asyncio を使う

# threading.Lock でスレッドセーフな操作
lock = threading.Lock()
counter = 0

def increment():
    global counter
    with lock:  # クリティカルセクション
        counter += 1

# threading.Event でスレッド間の同期
event = threading.Event()

def worker():
    event.wait()  # イベントが set されるまで待機
    print('開始!')

t = threading.Thread(target=worker)
t.start()
event.set()  # ワーカーを開始させる
t.join()     # ワーカーの終了を待つ