Python aiohttp 教程:异步请求、并发爬虫与超时重试完整指南

aiohttp 怎么用?本文从 GET、POST、params、headers、响应读取讲到超时、异常处理、并发控制、连接池和重试封装,适合写 Python 异步爬虫和批量接口请求。

7 分钟阅读 requests异常处理Python爬虫timeoutasyncioaiohttp

aiohttp 适合解决什么问题

aiohttp 是 Python 生态里常用的异步 HTTP 客户端和服务端库,底层基于 asyncio。在爬虫和接口采集场景里,最常见的用法是作为异步 HTTP 客户端:批量请求接口、并发抓取列表页、下载大量文件,或者把多个慢接口同时发出去等待结果。

如果只是请求一两个页面,requests 更直观;如果要同时请求几十个、几百个 URL,aiohttp 的优势会更明显。它不会让每个请求都同步排队等待,而是把等待网络响应的时间让出来,继续调度其他请求。

可以先记住一个判断标准:

场景 更适合的库
少量请求、脚本简单 requests
批量接口请求、并发爬页面 aiohttp
需要执行 JavaScript、处理复杂浏览器环境 Playwright

如果你还不熟悉同步请求和异常处理,可以先看 Python 爬虫 requests 异常处理完全指南

安装 aiohttp

基础安装:

pip install aiohttp

如果希望 DNS 解析、字符集检测和压缩处理有更好的性能,可以安装 speedups:

pip install "aiohttp[speedups]"

安装后可以先确认版本:

python -c "import aiohttp; print(aiohttp.__version__)"

第一个 GET 请求

aiohttp 的代码看起来比 requests 多几行,因为它需要运行在事件循环里,并且大多数网络操作都要 await

import asyncio

import aiohttp


async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://httpbin.org/get") as response:
            print(response.status)
            text = await response.text()
            print(text[:200])


asyncio.run(main())

这里有两个关键点:

  • ClientSession 表示一个客户端会话,负责复用连接、管理 Cookie、默认请求头等资源
  • await response.text() 表示等待响应体读取完成,忘记 await 拿到的是协程对象,不是文本结果

ClientSession 不要每次请求都新建。正确做法是创建一个 session,在它的生命周期内复用连接。

async with aiohttp.ClientSession() as session:
    for url in urls:
        async with session.get(url) as response:
            ...

GET 请求怎么传 params 和 headers

查询参数用 params

import asyncio

import aiohttp


async def main():
    params = {
        "keyword": "python",
        "page": 1,
        "size": 20,
    }

    async with aiohttp.ClientSession() as session:
        async with session.get("https://httpbin.org/get", params=params) as response:
            data = await response.json()
            print(data["args"])


asyncio.run(main())

等价于请求:

https://httpbin.org/get?keyword=python&page=1&size=20

请求头可以单次传入:

headers = {
    "User-Agent": "Mozilla/5.0",
    "Accept": "application/json",
}

async with session.get(url, headers=headers) as response:
    print(await response.text())

如果每个请求都要使用同一套请求头,更推荐放到 ClientSession 里:

headers = {
    "User-Agent": "Mozilla/5.0",
    "Accept": "application/json",
}

async with aiohttp.ClientSession(headers=headers) as session:
    async with session.get(url) as response:
        print(await response.text())

学习请求头时,可以配合 HTTP 请求头详解 一起看,尤其是 User-AgentRefererCookieContent-Type

POST 请求:data 和 json 怎么选

提交表单数据用 data

import asyncio

import aiohttp


async def main():
    form_data = {
        "username": "admin",
        "password": "123456",
    }

    async with aiohttp.ClientSession() as session:
        async with session.post("https://httpbin.org/post", data=form_data) as response:
            result = await response.json()
            print(result["form"])


asyncio.run(main())

这种方式通常对应:

Content-Type: application/x-www-form-urlencoded

提交 JSON 数据用 json

payload = {
    "name": "Alice",
    "age": 18,
}

async with session.post("https://httpbin.org/post", json=payload) as response:
    result = await response.json()
    print(result["json"])

使用 json=payload 时,aiohttp 会自动序列化字典,并设置合适的 Content-Type

上传文件时使用 aiohttp.FormData。真实项目里要用 with open(...) 管理文件句柄,避免文件没有关闭。

import aiohttp


with open("test.txt", "rb") as file:
    data = aiohttp.FormData()
    data.add_field(
        "file",
        file,
        filename="test.txt",
        content_type="text/plain",
    )

    async with session.post(url, data=data) as response:
        print(await response.text())

常见 HTTP 方法

除了 get()post()aiohttp 也支持常见 HTTP 方法:

await session.get(url)
await session.post(url)
await session.put(url)
await session.patch(url)
await session.delete(url)
await session.head(url)
await session.options(url)

更新资源可以用 PUT:

payload = {"title": "new title"}

async with session.put(url, json=payload) as response:
    response.raise_for_status()
    print(await response.text())

局部更新可以用 PATCH:

payload = {"nickname": "Tom"}

async with session.patch(url, json=payload) as response:
    response.raise_for_status()
    print(await response.text())

删除资源可以用 DELETE:

async with session.delete(url) as response:
    response.raise_for_status()
    print(response.status)

响应结果怎么读取

aiohttp 的响应对象是 ClientResponse,常用属性和方法如下:

需求 写法
状态码 response.status
响应头 response.headers
文本 await response.text()
JSON await response.json()
二进制 await response.read()
状态码检查 response.raise_for_status()

读取文本:

text = await response.text()

如果编码识别不准,可以指定编码:

text = await response.text(encoding="utf-8")

读取 JSON:

data = await response.json()

有些接口返回 JSON,但 Content-Type 不标准,可以临时放宽检查:

data = await response.json(content_type=None)

读取二进制内容:

content = await response.read()

with open("image.png", "wb") as file:
    file.write(content)

如果下载大文件,不建议一次性 await response.read() 全部读入内存,应改成分块读取。

async with session.get(url) as response:
    response.raise_for_status()

    with open("large_file.zip", "wb") as file:
        async for chunk in response.content.iter_chunked(1024 * 64):
            file.write(chunk)

超时必须显式设置

网络请求一定要设置超时。否则目标服务器一直不响应时,程序可能长时间卡住。

设置总超时:

timeout = aiohttp.ClientTimeout(total=10)

async with aiohttp.ClientSession(timeout=timeout) as session:
    async with session.get(url) as response:
        print(await response.text())

更细的超时配置:

timeout = aiohttp.ClientTimeout(
    total=30,
    connect=5,
    sock_connect=5,
    sock_read=10,
)

含义如下:

参数 含义
total 整个请求的总耗时上限
connect 从连接池获取连接或建立连接的超时
sock_connect Socket 建立连接超时
sock_read Socket 读取数据超时

完整写法:

import asyncio

import aiohttp


async def main():
    timeout = aiohttp.ClientTimeout(
        total=30,
        connect=5,
        sock_connect=5,
        sock_read=10,
    )

    async with aiohttp.ClientSession(timeout=timeout) as session:
        try:
            async with session.get("https://httpbin.org/delay/3") as response:
                response.raise_for_status()
                print(await response.text())
        except asyncio.TimeoutError:
            print("请求超时")


asyncio.run(main())

异常处理推荐写法

aiohttp 里常见异常包括超时、连接异常、响应状态码异常等。推荐先调用 raise_for_status(),让 4xx、5xx 状态码进入异常处理分支。

import asyncio

import aiohttp


async def fetch(session, url):
    try:
        async with session.get(url) as response:
            response.raise_for_status()
            return await response.text()

    except asyncio.TimeoutError:
        print("请求超时:", url)

    except aiohttp.ClientResponseError as exc:
        print("HTTP 状态码异常:", exc.status, exc.message)

    except aiohttp.ClientError as exc:
        print("请求异常:", exc)

    return None

ClientError 是 aiohttp 客户端异常的基类,适合兜底处理网络请求相关错误。更细的异常是否单独捕获,取决于你后续是否要做不同动作,比如切换代理、重试、记录失败 URL 或跳过当前任务。

并发请求怎么写

aiohttp 的核心优势是配合 asyncio.gather() 并发请求多个 URL。

import asyncio

import aiohttp


async def fetch(session, url):
    async with session.get(url) as response:
        response.raise_for_status()
        return await response.text()


async def main():
    urls = [
        "https://httpbin.org/get?page=1",
        "https://httpbin.org/get?page=2",
        "https://httpbin.org/get?page=3",
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    for result in results:
        print(result[:100])


asyncio.run(main())

这段代码会同时调度多个请求。每个请求等待网络响应时,事件循环可以继续处理其他请求。

控制并发数量

并发不是越高越好。并发太高可能导致本机连接数耗尽、目标服务器拒绝连接、代理失效、请求大量超时,甚至触发风控。

asyncio.Semaphore 控制同时运行的任务数量:

import asyncio

import aiohttp


async def fetch(session, url, sem):
    async with sem:
        async with session.get(url) as response:
            response.raise_for_status()
            return await response.text()


async def main():
    urls = [
        f"https://httpbin.org/get?page={page}"
        for page in range(1, 101)
    ]

    sem = asyncio.Semaphore(10)

    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, sem) for url in urls]
        results = await asyncio.gather(*tasks)

    print(len(results))


asyncio.run(main())

这里表示最多同时执行 10 个请求。

还可以用 TCPConnector 控制连接池:

connector = aiohttp.TCPConnector(
    limit=100,
    limit_per_host=10,
)

async with aiohttp.ClientSession(connector=connector) as session:
    ...

limit 是总连接数上限,limit_per_host 是单个域名的连接数上限。实际爬虫里通常会同时使用 SemaphoreTCPConnector:前者控制任务并发,后者控制连接资源。

带重试的请求封装

临时网络波动、代理偶发失败、目标服务 5xx 都可能适合重试。但重试不能无上限,也不要对所有错误盲目重试。

import asyncio

import aiohttp


async def fetch_with_retry(session, url, retries=3):
    for attempt in range(1, retries + 1):
        try:
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.text()

        except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
            print(f"第 {attempt} 次请求失败:{exc}")

            if attempt == retries:
                raise

            await asyncio.sleep(attempt)

更稳妥的生产写法还会加入:

  • 只对超时、连接失败、5xx 这类临时错误重试
  • 对 403、404 这类明确失败不重试
  • 使用指数退避,避免短时间打爆目标服务
  • 记录最终失败的 URL,方便后续补采

aiohttp 爬虫完整模板

下面这个模板包含请求头、URL 参数、超时、连接池、并发控制、异常处理和 JSON 响应读取,适合改造成批量接口采集脚本。

import asyncio

import aiohttp


HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/124.0 Safari/537.36"
    ),
    "Accept": "application/json",
}


async def fetch_page(session, page, sem):
    url = "https://httpbin.org/get"
    params = {
        "page": page,
        "size": 20,
    }

    async with sem:
        try:
            async with session.get(url, params=params) as response:
                response.raise_for_status()
                return await response.json()

        except asyncio.TimeoutError:
            print(f"第 {page} 页请求超时")

        except aiohttp.ClientResponseError as exc:
            print(f"第 {page} 页状态码异常:{exc.status}")

        except aiohttp.ClientError as exc:
            print(f"第 {page} 页请求异常:{exc}")

    return None


async def main():
    timeout = aiohttp.ClientTimeout(
        total=20,
        connect=5,
        sock_read=10,
    )

    connector = aiohttp.TCPConnector(
        limit=100,
        limit_per_host=10,
    )

    sem = asyncio.Semaphore(10)

    async with aiohttp.ClientSession(
        headers=HEADERS,
        timeout=timeout,
        connector=connector,
    ) as session:
        tasks = [
            fetch_page(session, page, sem)
            for page in range(1, 101)
        ]

        results = await asyncio.gather(*tasks)

    results = [item for item in results if item is not None]
    print("成功数量:", len(results))


if __name__ == "__main__":
    asyncio.run(main())

拿到 HTML 之后,如果要继续提取页面数据,可以把响应文本交给 Parsel、lxml 或 BeautifulSoup。选择器提取可以参考 Python Parsel 教程:用 CSS、XPath 提取网页数据

requests 和 aiohttp 怎么选

requests 的写法更短:

import requests


response = requests.get(url, timeout=10)
response.raise_for_status()
print(response.text)

aiohttp 的写法更适合并发:

async with aiohttp.ClientSession() as session:
    async with session.get(url) as response:
        response.raise_for_status()
        print(await response.text())

选择时可以按下面几个问题判断:

问题 建议
只是写一个简单脚本? 先用 requests
URL 数量很多,等待时间主要在网络 I/O? aiohttp
页面内容由 JavaScript 渲染? Playwright
需要登录态、Cookie、默认请求头复用? 两者都可以,aiohttpClientSession
需要高并发但目标站有限流? aiohttp,同时限制并发和连接数

常见坑

忘记 await

错误写法:

text = response.text()

正确写法:

text = await response.text()

aiohttp 里很多操作都是协程。忘记 await,不会得到真正结果。

每次请求都新建 ClientSession

不推荐:

for url in urls:
    async with aiohttp.ClientSession() as session:
        ...

推荐:

async with aiohttp.ClientSession() as session:
    tasks = [fetch(session, url) for url in urls]
    await asyncio.gather(*tasks)

并发太高

并发过高通常不是“更快”,而是更容易失败。建议先从 5 到 20 的并发开始测试,再根据目标站响应、代理质量和本机资源逐步调整。

常用控制手段:

sem = asyncio.Semaphore(10)
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)

没有处理状态码

只写:

text = await response.text()

并不会让 403、404、500 自动变成异常。建议在读取响应体前调用:

response.raise_for_status()

把 aiohttp 当成浏览器

aiohttp 只负责发送 HTTP 请求,不会执行 JavaScript。如果浏览器里能看到数据,但 response.text() 里没有,通常要去找接口,或者使用 Playwright 获取渲染后的页面。

常用模板速查

GET 模板:

import asyncio

import aiohttp


async def main():
    timeout = aiohttp.ClientTimeout(total=10)
    headers = {"User-Agent": "Mozilla/5.0"}
    params = {"page": 1}

    async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
        async with session.get("https://example.com/api", params=params) as response:
            response.raise_for_status()
            print(await response.text())


asyncio.run(main())

POST JSON 模板:

import asyncio

import aiohttp


async def main():
    payload = {
        "username": "admin",
        "password": "123456",
    }

    timeout = aiohttp.ClientTimeout(total=10)

    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.post("https://example.com/api/login", json=payload) as response:
            response.raise_for_status()
            print(await response.json())


asyncio.run(main())

并发请求模板:

import asyncio

import aiohttp


async def fetch(session, url, sem):
    async with sem:
        async with session.get(url) as response:
            response.raise_for_status()
            return await response.text()


async def main():
    urls = [
        f"https://example.com/page/{page}"
        for page in range(1, 101)
    ]

    timeout = aiohttp.ClientTimeout(total=15)
    sem = asyncio.Semaphore(10)

    async with aiohttp.ClientSession(timeout=timeout) as session:
        tasks = [fetch(session, url, sem) for url in urls]
        results = await asyncio.gather(*tasks)

    print(len(results))


asyncio.run(main())

常见问题

aiohttp 必须和 asyncio 一起用吗?

是的。aiohttp 的客户端请求方法是异步协程,通常通过 asyncio.run() 启动入口函数,并在请求、读取响应体等位置使用 await

aiohttp 会比 requests 一定更快吗?

不一定。少量请求时,requests 通常更简单,速度差异也不明显。aiohttp 的优势主要出现在大量 I/O 等待场景,比如批量请求接口、并发抓取页面、批量下载文件。

aiohttp 并发设置多少合适?

没有固定答案。可以先从 10 左右开始,根据目标站响应速度、错误率、代理质量和本机资源调整。不要一开始就开几百并发,爬虫稳定性通常比瞬时速度更重要。

response.json() 报 Content-Type 错误怎么办?

如果确认响应体确实是 JSON,但服务端返回的 Content-Type 不标准,可以使用:

data = await response.json(content_type=None)

如果内容本身不是合法 JSON,就应该先用 await response.text() 打印片段排查。

aiohttp 能不能带代理?

可以。单个请求里传 proxy

async with session.get(url, proxy="http://127.0.0.1:7890") as response:
    print(await response.text())

代理需要账号密码时:

proxy_auth = aiohttp.BasicAuth("username", "password")

async with session.get(
    url,
    proxy="http://127.0.0.1:7890",
    proxy_auth=proxy_auth,
) as response:
    print(await response.text())

Practice

读完这一节,去靶场里验证一下。

去挑战广场练习