Python 异步编程实战:从入门到精通

前言

Python 异步编程(asyncio)是处理高并发 I/O 的利器。本文从基础概念到实战案例,帮助你彻底掌握异步编程。

一、为什么需要异步编程?

同步 vs 异步

同步(阻塞):

import requests

def fetch_url(url):
    response = requests.get(url)  # 阻塞等待
    return response.text

# 串行执行,总耗时 = 每个请求耗时之和
for url in urls:
    data = fetch_url(url)

异步(非阻塞):

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

# 并发执行,总耗时 ≈ 最慢请求的耗时
async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

二、asyncio 核心概念

1. 协程(Coroutine)

async def hello():
    print("Hello")
    await asyncio.sleep(1)  # 异步休眠
    print("World")

# 运行协程
asyncio.run(hello())

2. Task 任务

async def task1():
    await asyncio.sleep(2)
    return "Task 1"

async def task2():
    await asyncio.sleep(1)
    return "Task 2"

async def main():
    # 创建并发任务
    t1 = asyncio.create_task(task1())
    t2 = asyncio.create_task(task2())
    
    # 等待所有任务完成
    results = await asyncio.gather(t1, t2)
    print(results)  # ['Task 1', 'Task 2']

asyncio.run(main())

3. 事件循环

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

三、实战:异步 Web 爬虫

import asyncio
import aiohttp
from bs4 import BeautifulSoup

async def fetch_page(session, url):
    try:
        async with session.get(url, timeout=10) as response:
            return await response.text()
    except Exception as e:
        print(f"Error fetching {url}: {e}")
        return None

async def parse_page(html):
    soup = BeautifulSoup(html, 'html.parser')
    # 解析逻辑
    return soup.title.string if soup.title else None

async def crawl(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_page(session, url) for url in urls]
        htmls = await asyncio.gather(*tasks)
        
        titles = []
        for html in htmls:
            if html:
                title = await parse_page(html)
                titles.append(title)
        
        return titles

# 使用
urls = [
    'https://example.com/page1',
    'https://example.com/page2',
    'https://example.com/page3'
]

results = asyncio.run(crawl(urls))
print(results)

四、异步数据库操作

import asyncpg

async def fetch_users():
    conn = await asyncpg.connect(
        host='localhost',
        database='mydb',
        user='user',
        password='password'
    )
    
    rows = await conn.fetch('SELECT * FROM users WHERE active = $1', True)
    await conn.close()
    
    return rows

async def insert_user(name, email):
    conn = await asyncpg.connect(...)
    await conn.execute(
        'INSERT INTO users(name, email) VALUES($1, $2)',
        name, email
    )
    await conn.close()

# 并发查询
async def main():
    tasks = [
        fetch_users(),
        insert_user('Alice', 'alice@example.com')
    ]
    results = await asyncio.gather(*tasks)

asyncio.run(main())

五、异步 API 服务器(FastAPI)

from fastapi import FastAPI
import asyncio

app = FastAPI()

async def slow_operation():
    await asyncio.sleep(2)
    return {"result": "done"}

@app.get("/async-endpoint")
async def async_route():
    result = await slow_operation()
    return result

@app.get("/concurrent")
async def concurrent_route():
    tasks = [slow_operation() for _ in range(3)]
    results = await asyncio.gather(*tasks)
    return {"results": results}

六、常见陷阱与最佳实践

1. 避免阻塞操作

# ❌ 错误:在异步函数中使用同步库
async def bad():
    import time
    time.sleep(1)  # 会阻塞整个事件循环!

# ✅ 正确:使用异步版本
async def good():
    await asyncio.sleep(1)

2. 使用 asyncio.to_thread 处理阻塞代码

import asyncio

def blocking_io():
    # 无法改为异步的阻塞操作
    with open('file.txt') as f:
        return f.read()

async def main():
    result = await asyncio.to_thread(blocking_io)
    print(result)

3. 超时控制

async def fetch_with_timeout(url):
    try:
        async with asyncio.timeout(5):  # Python 3.11+
            return await fetch_data(url)
    except asyncio.TimeoutError:
        return None

4. 异常处理

async def safe_task(url):
    try:
        return await fetch(url)
    except Exception as e:
        print(f"Error: {e}")
        return None

async def main():
    results = await asyncio.gather(
        safe_task(url1),
        safe_task(url2),
        return_exceptions=True  # 不会因为一个异常中断所有任务
    )

七、性能对比

测试:获取100个网页

• 同步方式:~45秒

• 多线程:~8秒

• 异步方式:~2秒

总结

异步编程适合 I/O 密集型任务(网络请求、数据库查询、文件操作)。掌握 asyncio 能显著提升应用性能。建议从简单示例开始,逐步应用到实际项目中。

← 返回首页

💬 评论 (10)

A
Anna Schmidt
2026-02-24
This is the best asyncio tutorial I've found! The performance comparison really shows the power of async programming.
刘明
2026-02-24
写得太好了!终于搞懂了 async/await。之前一直搞不清楚协程和线程的区别。
M
Marco Rossi
2026-02-24
Question: Can I use asyncio with Django? Or is it better to use FastAPI for async applications?
张伟(博主)
2026-02-24
@Marco Django 3.1+ supports async views, but FastAPI is designed for async from the ground up. For new projects, I'd recommend FastAPI.
王芳
2026-02-24
爬虫那个例子太实用了!我们爬取数据的速度提升了10倍以上。
H
Hassan Ali
2026-02-25
The asyncpg example is great! Much faster than psycopg2. We migrated our API and response time improved 3x.
李华
2026-02-25
请问 asyncio.gather 和 asyncio.wait 有什么区别?什么时候用哪个?
张伟(博主)
2026-02-25
@李华 gather 更简单,返回结果列表。wait 更灵活,可以控制等待条件(ALL_COMPLETED, FIRST_COMPLETED 等)。大多数情况用 gather 就够了。
C
Chen Wei
2026-02-25
Bookmarked! Planning to refactor our microservices to use async Python. This will be my reference guide.
赵静
2026-02-26
感谢分享!能否出一篇关于 asyncio 和 multiprocessing 结合使用的文章?

发表评论