Python 异步编程详解:asyncio 高并发实战

异步编程是 Python 高性能开发的核心技能。作为冉冉博客的 Python 教程系列,今天深入讲解 asyncio 的使用方法。

一、异步编程基础

同步 vs 异步的区别:

# 同步:一个个执行,耗时 3 秒
import time
def sync_task():
    time.sleep(1)  # 等待
    time.sleep(1)
    time.sleep(1)

# 异步:同时执行,耗时 1 秒
import asyncio
async def async_task():
    await asyncio.sleep(1)  # 切换任务
    await asyncio.sleep(1)
    await asyncio.sleep(1)

二、asyncio 核心概念

import asyncio

# 定义异步函数
async def hello():
    print('Hello')
    await asyncio.sleep(1)
    print('World')

# 运行事件循环
asyncio.run(hello())

# 并发运行多个任务
async def main():
    await asyncio.gather(
        hello(),
        hello(),
        hello()
    )
asyncio.run(main())

三、await 与任务管理

async def fetch(url):
    await asyncio.sleep(1)  # 模拟网络请求
    return f'Data from {url}'

async def main():
    # 创建任务(不立即执行)
    task1 = asyncio.create_task(fetch('url1'))
    task2 = asyncio.create_task(fetch('url2'))
    
    # 等待所有任务完成
    results = await asyncio.gather(task1, task2)
    print(results)

# 任务超时控制
async def with_timeout():
    try:
        await asyncio.wait_for(fetch('url'), timeout=5)
    except asyncio.TimeoutError:
        print('请求超时')

四、异步上下文管理器

class AsyncDB:
    async def __aenter__(self):
        self.conn = await asyncpg.connect(...)
        return self
    
    async def __aexit__(self, *args):
        await self.conn.close()

# 使用
async def main():
    async with AsyncDB() as db:
        result = await db.query('SELECT * FROM users')
        return result

五、异步迭代器与生成器

# 异步生成器
async def async_gen(urls):
    for url in urls:
        data = await fetch(url)
        yield data

# 异步迭代
async def main():
    async for item in async_gen(['url1', 'url2']):
        print(item)

# aiter / anext
class AsyncPager:
    def __init__(self, page):
        self.page = page
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.page > 10:
            raise StopAsyncIteration
        self.page += 1
        return await fetch_page(self.page)

六、asyncio 高级用法

# 信号量控制并发数
semaphore = asyncio.Semaphore(5)

async def limited_task():
    async with semaphore:
        await fetch('url')

# 锁
lock = asyncio.Lock()
async def critical_section():
    async with lock:
        # 互斥访问
        pass

# 事件
event = asyncio.Event()

async def producer():
    await produce()
    event.set()

async def consumer():
    await event.wait()  # 等待事件
    print('收到通知')

七、异步 HTTP 请求

import aiohttp
import asyncio

async def fetch_all(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.json()

# 使用
urls = ['https://api.example.com/1', 'https://api.example.com/2']
results = asyncio.run(fetch_all(urls))

八、实战:异步爬虫

import aiohttp
import asyncio
from bs4 import BeautifulSoup

async def scrape(url, session):
    async with session.get(url) as response:
        html = await response.text()
        soup = BeautifulSoup(html, 'html.parser')
        return soup.title.string

async def main(urls):
    connector = aiohttp.TCPConnector(limit=10)
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [scrape(url, session) for url in urls]
        return await asyncio.gather(*tasks)

# 爬取 1000 个页面
asyncio.run(main(large_url_list))

asyncio 是 Python 高性能编程的核心。掌握这些技能,能让你的程序性能提升数倍。更多 Python 实战教程,欢迎持续关注冉冉博客。

© 版权声明
THE END
喜欢就支持一下吧
点赞13 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容