Python 异步编程实战:从入门到精通
📅 2026-02-24 | ⏱️ 15分钟 | 👁 328 次阅读 | 💬 10 条评论
前言
Python 异步编程(asyncio)是处理高并发 I/O 的利器。本文从基础概念到实战案例,帮助你彻底掌握异步编程。
一、为什么需要异步编程?
同步 vs 异步
同步(阻塞):
import requests
def fetch_url(url):
response = requests.get(url) # 阻塞等待
return response.text
# 串行执行,总耗时 = 每个请求耗时之和
for url in urls:
data = fetch_url(url)
异步(非阻塞):
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
# 并发执行,总耗时 ≈ 最慢请求的耗时
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
二、asyncio 核心概念
1. 协程(Coroutine)
async def hello():
print("Hello")
await asyncio.sleep(1) # 异步休眠
print("World")
# 运行协程
asyncio.run(hello())
2. Task 任务
async def task1():
await asyncio.sleep(2)
return "Task 1"
async def task2():
await asyncio.sleep(1)
return "Task 2"
async def main():
# 创建并发任务
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
# 等待所有任务完成
results = await asyncio.gather(t1, t2)
print(results) # ['Task 1', 'Task 2']
asyncio.run(main())
3. 事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
三、实战:异步 Web 爬虫
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def fetch_page(session, url):
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def parse_page(html):
soup = BeautifulSoup(html, 'html.parser')
# 解析逻辑
return soup.title.string if soup.title else None
async def crawl(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_page(session, url) for url in urls]
htmls = await asyncio.gather(*tasks)
titles = []
for html in htmls:
if html:
title = await parse_page(html)
titles.append(title)
return titles
# 使用
urls = [
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page3'
]
results = asyncio.run(crawl(urls))
print(results)
四、异步数据库操作
import asyncpg
async def fetch_users():
conn = await asyncpg.connect(
host='localhost',
database='mydb',
user='user',
password='password'
)
rows = await conn.fetch('SELECT * FROM users WHERE active = $1', True)
await conn.close()
return rows
async def insert_user(name, email):
conn = await asyncpg.connect(...)
await conn.execute(
'INSERT INTO users(name, email) VALUES($1, $2)',
name, email
)
await conn.close()
# 并发查询
async def main():
tasks = [
fetch_users(),
insert_user('Alice', 'alice@example.com')
]
results = await asyncio.gather(*tasks)
asyncio.run(main())
五、异步 API 服务器(FastAPI)
from fastapi import FastAPI
import asyncio
app = FastAPI()
async def slow_operation():
await asyncio.sleep(2)
return {"result": "done"}
@app.get("/async-endpoint")
async def async_route():
result = await slow_operation()
return result
@app.get("/concurrent")
async def concurrent_route():
tasks = [slow_operation() for _ in range(3)]
results = await asyncio.gather(*tasks)
return {"results": results}
六、常见陷阱与最佳实践
1. 避免阻塞操作
# ❌ 错误:在异步函数中使用同步库
async def bad():
import time
time.sleep(1) # 会阻塞整个事件循环!
# ✅ 正确:使用异步版本
async def good():
await asyncio.sleep(1)
2. 使用 asyncio.to_thread 处理阻塞代码
import asyncio
def blocking_io():
# 无法改为异步的阻塞操作
with open('file.txt') as f:
return f.read()
async def main():
result = await asyncio.to_thread(blocking_io)
print(result)
3. 超时控制
async def fetch_with_timeout(url):
try:
async with asyncio.timeout(5): # Python 3.11+
return await fetch_data(url)
except asyncio.TimeoutError:
return None
4. 异常处理
async def safe_task(url):
try:
return await fetch(url)
except Exception as e:
print(f"Error: {e}")
return None
async def main():
results = await asyncio.gather(
safe_task(url1),
safe_task(url2),
return_exceptions=True # 不会因为一个异常中断所有任务
)
七、性能对比
测试:获取100个网页
• 同步方式:~45秒
• 多线程:~8秒
• 异步方式:~2秒
总结
异步编程适合 I/O 密集型任务(网络请求、数据库查询、文件操作)。掌握 asyncio 能显著提升应用性能。建议从简单示例开始,逐步应用到实际项目中。
💬 评论 (10)
发表评论