Python aiohttp 教程:异步请求、并发爬虫与超时重试完整指南
aiohttp 怎么用?本文从 GET、POST、params、headers、响应读取讲到超时、异常处理、并发控制、连接池和重试封装,适合写 Python 异步爬虫和批量接口请求。
aiohttp 适合解决什么问题
aiohttp 是 Python 生态里常用的异步 HTTP 客户端和服务端库,底层基于 asyncio。在爬虫和接口采集场景里,最常见的用法是作为异步 HTTP 客户端:批量请求接口、并发抓取列表页、下载大量文件,或者把多个慢接口同时发出去等待结果。
如果只是请求一两个页面,requests 更直观;如果要同时请求几十个、几百个 URL,aiohttp 的优势会更明显。它不会让每个请求都同步排队等待,而是把等待网络响应的时间让出来,继续调度其他请求。
可以先记住一个判断标准:
| 场景 | 更适合的库 |
|---|---|
| 少量请求、脚本简单 | requests |
| 批量接口请求、并发爬页面 | aiohttp |
| 需要执行 JavaScript、处理复杂浏览器环境 | Playwright |
如果你还不熟悉同步请求和异常处理,可以先看 Python 爬虫 requests 异常处理完全指南。
安装 aiohttp
基础安装:
pip install aiohttp
如果希望 DNS 解析、字符集检测和压缩处理有更好的性能,可以安装 speedups:
pip install "aiohttp[speedups]"
安装后可以先确认版本:
python -c "import aiohttp; print(aiohttp.__version__)"
第一个 GET 请求
aiohttp 的代码看起来比 requests 多几行,因为它需要运行在事件循环里,并且大多数网络操作都要 await。
import asyncio
import aiohttp
async def main():
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/get") as response:
print(response.status)
text = await response.text()
print(text[:200])
asyncio.run(main())
这里有两个关键点:
ClientSession表示一个客户端会话,负责复用连接、管理 Cookie、默认请求头等资源await response.text()表示等待响应体读取完成,忘记await拿到的是协程对象,不是文本结果
ClientSession 不要每次请求都新建。正确做法是创建一个 session,在它的生命周期内复用连接。
async with aiohttp.ClientSession() as session:
for url in urls:
async with session.get(url) as response:
...
GET 请求怎么传 params 和 headers
查询参数用 params:
import asyncio
import aiohttp
async def main():
params = {
"keyword": "python",
"page": 1,
"size": 20,
}
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/get", params=params) as response:
data = await response.json()
print(data["args"])
asyncio.run(main())
等价于请求:
https://httpbin.org/get?keyword=python&page=1&size=20
请求头可以单次传入:
headers = {
"User-Agent": "Mozilla/5.0",
"Accept": "application/json",
}
async with session.get(url, headers=headers) as response:
print(await response.text())
如果每个请求都要使用同一套请求头,更推荐放到 ClientSession 里:
headers = {
"User-Agent": "Mozilla/5.0",
"Accept": "application/json",
}
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get(url) as response:
print(await response.text())
学习请求头时,可以配合 HTTP 请求头详解 一起看,尤其是 User-Agent、Referer、Cookie 和 Content-Type。
POST 请求:data 和 json 怎么选
提交表单数据用 data:
import asyncio
import aiohttp
async def main():
form_data = {
"username": "admin",
"password": "123456",
}
async with aiohttp.ClientSession() as session:
async with session.post("https://httpbin.org/post", data=form_data) as response:
result = await response.json()
print(result["form"])
asyncio.run(main())
这种方式通常对应:
Content-Type: application/x-www-form-urlencoded
提交 JSON 数据用 json:
payload = {
"name": "Alice",
"age": 18,
}
async with session.post("https://httpbin.org/post", json=payload) as response:
result = await response.json()
print(result["json"])
使用 json=payload 时,aiohttp 会自动序列化字典,并设置合适的 Content-Type。
上传文件时使用 aiohttp.FormData。真实项目里要用 with open(...) 管理文件句柄,避免文件没有关闭。
import aiohttp
with open("test.txt", "rb") as file:
data = aiohttp.FormData()
data.add_field(
"file",
file,
filename="test.txt",
content_type="text/plain",
)
async with session.post(url, data=data) as response:
print(await response.text())
常见 HTTP 方法
除了 get() 和 post(),aiohttp 也支持常见 HTTP 方法:
await session.get(url)
await session.post(url)
await session.put(url)
await session.patch(url)
await session.delete(url)
await session.head(url)
await session.options(url)
更新资源可以用 PUT:
payload = {"title": "new title"}
async with session.put(url, json=payload) as response:
response.raise_for_status()
print(await response.text())
局部更新可以用 PATCH:
payload = {"nickname": "Tom"}
async with session.patch(url, json=payload) as response:
response.raise_for_status()
print(await response.text())
删除资源可以用 DELETE:
async with session.delete(url) as response:
response.raise_for_status()
print(response.status)
响应结果怎么读取
aiohttp 的响应对象是 ClientResponse,常用属性和方法如下:
| 需求 | 写法 |
|---|---|
| 状态码 | response.status |
| 响应头 | response.headers |
| 文本 | await response.text() |
| JSON | await response.json() |
| 二进制 | await response.read() |
| 状态码检查 | response.raise_for_status() |
读取文本:
text = await response.text()
如果编码识别不准,可以指定编码:
text = await response.text(encoding="utf-8")
读取 JSON:
data = await response.json()
有些接口返回 JSON,但 Content-Type 不标准,可以临时放宽检查:
data = await response.json(content_type=None)
读取二进制内容:
content = await response.read()
with open("image.png", "wb") as file:
file.write(content)
如果下载大文件,不建议一次性 await response.read() 全部读入内存,应改成分块读取。
async with session.get(url) as response:
response.raise_for_status()
with open("large_file.zip", "wb") as file:
async for chunk in response.content.iter_chunked(1024 * 64):
file.write(chunk)
超时必须显式设置
网络请求一定要设置超时。否则目标服务器一直不响应时,程序可能长时间卡住。
设置总超时:
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as response:
print(await response.text())
更细的超时配置:
timeout = aiohttp.ClientTimeout(
total=30,
connect=5,
sock_connect=5,
sock_read=10,
)
含义如下:
| 参数 | 含义 |
|---|---|
total |
整个请求的总耗时上限 |
connect |
从连接池获取连接或建立连接的超时 |
sock_connect |
Socket 建立连接超时 |
sock_read |
Socket 读取数据超时 |
完整写法:
import asyncio
import aiohttp
async def main():
timeout = aiohttp.ClientTimeout(
total=30,
connect=5,
sock_connect=5,
sock_read=10,
)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.get("https://httpbin.org/delay/3") as response:
response.raise_for_status()
print(await response.text())
except asyncio.TimeoutError:
print("请求超时")
asyncio.run(main())
异常处理推荐写法
aiohttp 里常见异常包括超时、连接异常、响应状态码异常等。推荐先调用 raise_for_status(),让 4xx、5xx 状态码进入异常处理分支。
import asyncio
import aiohttp
async def fetch(session, url):
try:
async with session.get(url) as response:
response.raise_for_status()
return await response.text()
except asyncio.TimeoutError:
print("请求超时:", url)
except aiohttp.ClientResponseError as exc:
print("HTTP 状态码异常:", exc.status, exc.message)
except aiohttp.ClientError as exc:
print("请求异常:", exc)
return None
ClientError 是 aiohttp 客户端异常的基类,适合兜底处理网络请求相关错误。更细的异常是否单独捕获,取决于你后续是否要做不同动作,比如切换代理、重试、记录失败 URL 或跳过当前任务。
并发请求怎么写
aiohttp 的核心优势是配合 asyncio.gather() 并发请求多个 URL。
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
response.raise_for_status()
return await response.text()
async def main():
urls = [
"https://httpbin.org/get?page=1",
"https://httpbin.org/get?page=2",
"https://httpbin.org/get?page=3",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(result[:100])
asyncio.run(main())
这段代码会同时调度多个请求。每个请求等待网络响应时,事件循环可以继续处理其他请求。
控制并发数量
并发不是越高越好。并发太高可能导致本机连接数耗尽、目标服务器拒绝连接、代理失效、请求大量超时,甚至触发风控。
用 asyncio.Semaphore 控制同时运行的任务数量:
import asyncio
import aiohttp
async def fetch(session, url, sem):
async with sem:
async with session.get(url) as response:
response.raise_for_status()
return await response.text()
async def main():
urls = [
f"https://httpbin.org/get?page={page}"
for page in range(1, 101)
]
sem = asyncio.Semaphore(10)
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, sem) for url in urls]
results = await asyncio.gather(*tasks)
print(len(results))
asyncio.run(main())
这里表示最多同时执行 10 个请求。
还可以用 TCPConnector 控制连接池:
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=10,
)
async with aiohttp.ClientSession(connector=connector) as session:
...
limit 是总连接数上限,limit_per_host 是单个域名的连接数上限。实际爬虫里通常会同时使用 Semaphore 和 TCPConnector:前者控制任务并发,后者控制连接资源。
带重试的请求封装
临时网络波动、代理偶发失败、目标服务 5xx 都可能适合重试。但重试不能无上限,也不要对所有错误盲目重试。
import asyncio
import aiohttp
async def fetch_with_retry(session, url, retries=3):
for attempt in range(1, retries + 1):
try:
async with session.get(url) as response:
response.raise_for_status()
return await response.text()
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
print(f"第 {attempt} 次请求失败:{exc}")
if attempt == retries:
raise
await asyncio.sleep(attempt)
更稳妥的生产写法还会加入:
- 只对超时、连接失败、5xx 这类临时错误重试
- 对 403、404 这类明确失败不重试
- 使用指数退避,避免短时间打爆目标服务
- 记录最终失败的 URL,方便后续补采
aiohttp 爬虫完整模板
下面这个模板包含请求头、URL 参数、超时、连接池、并发控制、异常处理和 JSON 响应读取,适合改造成批量接口采集脚本。
import asyncio
import aiohttp
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0 Safari/537.36"
),
"Accept": "application/json",
}
async def fetch_page(session, page, sem):
url = "https://httpbin.org/get"
params = {
"page": page,
"size": 20,
}
async with sem:
try:
async with session.get(url, params=params) as response:
response.raise_for_status()
return await response.json()
except asyncio.TimeoutError:
print(f"第 {page} 页请求超时")
except aiohttp.ClientResponseError as exc:
print(f"第 {page} 页状态码异常:{exc.status}")
except aiohttp.ClientError as exc:
print(f"第 {page} 页请求异常:{exc}")
return None
async def main():
timeout = aiohttp.ClientTimeout(
total=20,
connect=5,
sock_read=10,
)
connector = aiohttp.TCPConnector(
limit=100,
limit_per_host=10,
)
sem = asyncio.Semaphore(10)
async with aiohttp.ClientSession(
headers=HEADERS,
timeout=timeout,
connector=connector,
) as session:
tasks = [
fetch_page(session, page, sem)
for page in range(1, 101)
]
results = await asyncio.gather(*tasks)
results = [item for item in results if item is not None]
print("成功数量:", len(results))
if __name__ == "__main__":
asyncio.run(main())
拿到 HTML 之后,如果要继续提取页面数据,可以把响应文本交给 Parsel、lxml 或 BeautifulSoup。选择器提取可以参考 Python Parsel 教程:用 CSS、XPath 提取网页数据。
requests 和 aiohttp 怎么选
requests 的写法更短:
import requests
response = requests.get(url, timeout=10)
response.raise_for_status()
print(response.text)
aiohttp 的写法更适合并发:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
print(await response.text())
选择时可以按下面几个问题判断:
| 问题 | 建议 |
|---|---|
| 只是写一个简单脚本? | 先用 requests |
| URL 数量很多,等待时间主要在网络 I/O? | 用 aiohttp |
| 页面内容由 JavaScript 渲染? | 用 Playwright |
| 需要登录态、Cookie、默认请求头复用? | 两者都可以,aiohttp 用 ClientSession |
| 需要高并发但目标站有限流? | 用 aiohttp,同时限制并发和连接数 |
常见坑
忘记 await
错误写法:
text = response.text()
正确写法:
text = await response.text()
aiohttp 里很多操作都是协程。忘记 await,不会得到真正结果。
每次请求都新建 ClientSession
不推荐:
for url in urls:
async with aiohttp.ClientSession() as session:
...
推荐:
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
await asyncio.gather(*tasks)
并发太高
并发过高通常不是“更快”,而是更容易失败。建议先从 5 到 20 的并发开始测试,再根据目标站响应、代理质量和本机资源逐步调整。
常用控制手段:
sem = asyncio.Semaphore(10)
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
没有处理状态码
只写:
text = await response.text()
并不会让 403、404、500 自动变成异常。建议在读取响应体前调用:
response.raise_for_status()
把 aiohttp 当成浏览器
aiohttp 只负责发送 HTTP 请求,不会执行 JavaScript。如果浏览器里能看到数据,但 response.text() 里没有,通常要去找接口,或者使用 Playwright 获取渲染后的页面。
常用模板速查
GET 模板:
import asyncio
import aiohttp
async def main():
timeout = aiohttp.ClientTimeout(total=10)
headers = {"User-Agent": "Mozilla/5.0"}
params = {"page": 1}
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
async with session.get("https://example.com/api", params=params) as response:
response.raise_for_status()
print(await response.text())
asyncio.run(main())
POST JSON 模板:
import asyncio
import aiohttp
async def main():
payload = {
"username": "admin",
"password": "123456",
}
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post("https://example.com/api/login", json=payload) as response:
response.raise_for_status()
print(await response.json())
asyncio.run(main())
并发请求模板:
import asyncio
import aiohttp
async def fetch(session, url, sem):
async with sem:
async with session.get(url) as response:
response.raise_for_status()
return await response.text()
async def main():
urls = [
f"https://example.com/page/{page}"
for page in range(1, 101)
]
timeout = aiohttp.ClientTimeout(total=15)
sem = asyncio.Semaphore(10)
async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = [fetch(session, url, sem) for url in urls]
results = await asyncio.gather(*tasks)
print(len(results))
asyncio.run(main())
常见问题
aiohttp 必须和 asyncio 一起用吗?
是的。aiohttp 的客户端请求方法是异步协程,通常通过 asyncio.run() 启动入口函数,并在请求、读取响应体等位置使用 await。
aiohttp 会比 requests 一定更快吗?
不一定。少量请求时,requests 通常更简单,速度差异也不明显。aiohttp 的优势主要出现在大量 I/O 等待场景,比如批量请求接口、并发抓取页面、批量下载文件。
aiohttp 并发设置多少合适?
没有固定答案。可以先从 10 左右开始,根据目标站响应速度、错误率、代理质量和本机资源调整。不要一开始就开几百并发,爬虫稳定性通常比瞬时速度更重要。
response.json() 报 Content-Type 错误怎么办?
如果确认响应体确实是 JSON,但服务端返回的 Content-Type 不标准,可以使用:
data = await response.json(content_type=None)
如果内容本身不是合法 JSON,就应该先用 await response.text() 打印片段排查。
aiohttp 能不能带代理?
可以。单个请求里传 proxy:
async with session.get(url, proxy="http://127.0.0.1:7890") as response:
print(await response.text())
代理需要账号密码时:
proxy_auth = aiohttp.BasicAuth("username", "password")
async with session.get(
url,
proxy="http://127.0.0.1:7890",
proxy_auth=proxy_auth,
) as response:
print(await response.text())
Practice