在冉冉博客的运营过程中,我积累了不少用 Python 生成器提升定时任务效率的经验。今天整理成教程分享给大家,希望能提升你的开发效率。
一、生成器基础与内存优化
Python 生成器是处理大数据集的利器。相比普通函数,生成器使用惰性计算,只在需要时才产生值,内存占用可以降低数十倍。冉冉博客的日志分析脚本使用生成器后,内存占用从 800MB 降到 15MB。
1.1 生成器 vs 列表
# 列表 - 一次性加载全部数据到内存
def get_urls_list():
urls = []
for i in range(1000000):
urls.append(f"https://example.com/item/{i}")
return urls
# 生成器 - 惰性计算,按需产生数据
def get_urls_generator():
for i in range(1000000):
yield f"https://example.com/item/{i}"
# 内存对比
list_urls = get_urls_list() # 占用 ~800MB
gen_urls = get_urls_generator() # 占用 ~100 字节
# 使用方式完全相同
for url in gen_urls:
process(url)
1.2 生成器表达式
# 生成器表达式 - 语法类似列表推导式
squares_gen = (x**2 for x in range(10000000))
# 使用 next() 获取值
print(next(squares_gen)) # 0
print(next(squares_gen)) # 1
# 用 list() 物化(谨慎使用,避免内存爆炸)
squares_list = list(squares_gen) # 不推荐!
# sum 等函数可以直接消费生成器
total = sum(x**2 for x in range(10000000))
print(total)
二、yield 进阶与协程
理解 yield 的工作机制后,可以用它实现协程、管道处理、无限序列等高级模式。
2.1 生成器管道模式
# 数据处理管道:多个生成器串联
def fetch_pages(urls):
"""模拟从网络获取页面"""
for url in urls:
yield f"HTML_{url}"
def parse_links(pages):
"""解析页面中的链接"""
for html in pages:
links = [f"link_{i}_from_{html}" for i in range(5)]
yield from links
def filter_valid(links):
"""过滤有效链接"""
for link in links:
if "valid" in link or "page" in link:
yield link
# 使用管道
urls = [f"/item/{i}" for i in range(1000)]
pipeline = filter_valid(parse_links(fetch_pages(urls)))
for valid_link in pipeline:
save_to_db(valid_link)
2.2 协程与异步生成器
# Python 3.6+ 协程
def async_worker():
while True:
item = yield # 接收上游数据
result = await process(item)
# 发送结果到下游
yield result
# 异步生成器(Python 3.6+)
async def fetch_all(urls):
async for url in urls:
yield await fetch(url)
# asyncio 异步管道
import asyncio
async def main():
urls = [f"https://api.example.com/{i}" for i in range(100)]
# 并发抓取
results = await asyncio.gather(*[
fetch(url) for url in urls
])
# 使用异步生成器流式处理
async for result in stream_results(results):
await save(result)
asyncio.run(main())
三、定时任务中的生成器应用
定时任务通常需要处理大量数据,生成器能让批处理更高效、更稳定。
3.1 批处理模式
import time
from itertools import islice
def batch_process(items, batch_size=1000):
"""批处理生成器 - 将数据切分成批次"""
it = iter(items)
while True:
batch = list(islice(it, batch_size))
if not batch:
break
yield batch
def process_large_file(filepath):
"""逐批处理大文件"""
with open(filepath, 'r') as f:
for batch in batch_process(f, batch_size=5000):
# 每批 5000 行
records = [parse_line(line) for line in batch]
yield records
# 定时任务使用
def daily_task():
records_gen = process_large_file('/data/access.log')
total = 0
for batch in records_gen:
db.bulk_insert(batch)
total += len(batch)
print(f"已处理 {total} 条记录")
# APScheduler 定时执行
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.add_job(daily_task, 'cron', hour=3, minute=0)
scheduler.start()
3.2 断点续传与检查点
import json
class CheckpointGenerator:
"""带检查点的生成器 - 支持中断恢复"""
def __init__(self, filepath, checkpoint_file):
self.filepath = filepath
self.checkpoint_file = checkpoint_file
self.checkpoint = self._load_checkpoint()
def _load_checkpoint(self):
try:
with open(self.checkpoint_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {'processed': 0}
def _save_checkpoint(self, line_num):
with open(self.checkpoint_file, 'w') as f:
json.dump({'processed': line_num}, f)
def process_with_resume(self):
with open(self.filepath, 'r') as f:
for line_num, line in enumerate(f, 1):
if line_num <= self.checkpoint['processed']:
continue
yield self._process_line(line)
if line_num % 1000 == 0:
self._save_checkpoint(line_num)
# 使用
processor = CheckpointGenerator('huge_file.csv', '.checkpoint.json')
for record in processor.process_with_resume():
save_to_db(record)
四、调度器与任务队列集成
生成器与任务调度器结合,能实现优雅的异步批处理架构。
4.1 APScheduler 集成
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
def run_etl_pipeline():
"""ETL 管道 - 从数据库读取、处理、写回"""
generator = fetch_records_from_db(batch_size=5000)
for batch in generator:
transformed = transform_data(batch)
load_to_warehouse(transformed)
print(f"批次完成: {len(transformed)} 条")
print("ETL 管道执行完成")
scheduler = BackgroundScheduler()
# 每天凌晨3点执行
scheduler.add_job(
run_etl_pipeline,
CronTrigger(hour=3, minute=0),
id='daily_etl',
replace_existing=True,
misfire_grace_time=3600 # 允许1小时内错过执行
)
scheduler.start()
print("调度器已启动,等待任务执行...")
4.2 Celery 异步任务
from celery import Celery
from celery.signals import worker_process_init
app = Celery('tasks', broker='redis://localhost:6379')
@app.task(bind=True, max_retries=3)
def process_large_dataset(self, filepath):
"""Celery 异步处理大文件"""
generator = read_file_generator(filepath, chunk_size=10000)
results = []
for chunk in generator:
try:
result = process_chunk(chunk)
results.extend(result)
except Exception as e:
self.retry(exc=e, countdown=60)
return {'total': len(results), 'status': 'success'}
# 使用
task = process_large_dataset.delay('/data/huge_file.csv')
print(f"任务ID: {task.id}")
# 监控
while not task.ready():
time.sleep(5)
print(f"结果: {task.result}")
五、性能对比与最佳实践
生成器虽然强大,但也要用对场景。下面是冉冉博客实际项目中的性能对比数据。
5.1 性能基准测试
import time
import tracemalloc
def benchmark(func, *args, **kwargs):
tracemalloc.start()
start = time.perf_counter()
result = func(*args, **kwargs)
if hasattr(result, '__iter__'):
result = list(result) # 物化生成器
end = time.perf_counter()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
return {
'time': end - start,
'memory_current': current / 1024 / 1024,
'memory_peak': peak / 1024 / 1024
}
# 测试:处理 1000 万行数据
# 列表方法
result = benchmark(process_list, range(10_000_000))
print(f"列表: {result['time']:.2f}s, 峰值内存: {result['memory_peak']:.1f}MB")
# 生成器方法
result = benchmark(process_generator, range(10_000_000))
print(f"生成器: {result['time']:.2f}s, 峰值内存: {result['memory_peak']:.1f}MB")
5.2 最佳实践总结
# 1. 什么时候用生成器
# ✓ 处理未知长度或超大数据集
# ✓ 流水线式数据处理
# ✓ 实时数据流(网络、传感器)
# ✗ 数据集很小(<1000条),列表更简单
# ✗ 需要随机访问(生成器只能顺序迭代)
# 2. 生成器使用规范
def my_generator(items):
for item in items:
if is_valid(item):
yield item
# 不要在生成器中 return 值后的代码
# return 后的代码不会执行
# 3. 调试技巧
# 使用 tuple() 或 list() 物化来查看完整结果
result = list(my_generator(large_dataset))
print(f"共 {len(result)} 条结果")
# 4. 生成器链式调用
pipeline = (
source_generator()
| filter_transform
| aggregate_stats
| format_output
)
for item in pipeline:
print(item)
以上就是用 Python 生成器提升定时任务效率的完整分享。这些技巧都来自冉冉博客的实战经验,希望能帮到你。有问题欢迎留言交流!
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END
















暂无评论内容