main
zhaoxiangpeng 1 week ago
parent 7da4ac90c6
commit a0a8d05c61

@ -0,0 +1,119 @@
# pipelines/buffer_component.py
from typing import Dict, List, Any, Optional
from itemadapter import ItemAdapter
import time
class SimpleBuffer:
"""
简单的缓冲区组件只负责缓存管理不处理数据库插入
"""
def __init__(self, buffer_max_size: int = 100, flush_interval: int = 30):
self.buffer_max_size = buffer_max_size
self.flush_interval = flush_interval
# 缓冲区
self.buffers: Dict[str, List[Dict]] = {}
self.total_size = 0
# 时间控制
self.last_flush_time = time.time()
# 统计
self.stats = {
'items_added': 0,
'buffers_flushed': 0,
'last_operation': None
}
def add_item(self, item: Any, item_type: str) -> bool:
"""
添加Item到缓冲区
Args:
item: 要添加的Item
item_type: Item类型标识
Returns:
bool: 是否触发了刷新
"""
# 初始化该类型的缓冲区
if item_type not in self.buffers:
self.buffers[item_type] = []
# 转换Item为字典
item_dict = self._item_to_dict(item)
# 添加到缓冲区
self.buffers[item_type].append(item_dict)
self.total_size += 1
self.stats['items_added'] += 1
# 检查是否需要刷新
should_flush = (
len(self.buffers[item_type]) >= self.buffer_max_size or
self._should_flush_by_time()
)
if should_flush:
self.last_flush_time = time.time()
return should_flush
def get_buffer(self, item_type: str) -> List[Dict]:
"""获取指定类型的缓冲区数据"""
return self.buffers.get(item_type, [])
def get_all_buffers(self) -> Dict[str, List[Dict]]:
"""获取所有缓冲区数据"""
return self.buffers.copy()
def clear_buffer(self, item_type: str):
"""清空指定类型的缓冲区"""
if item_type in self.buffers:
self.total_size -= len(self.buffers[item_type])
self.buffers[item_type].clear()
self.stats['buffers_flushed'] += 1
def clear_all_buffers(self):
"""清空所有缓冲区"""
for item_type in list(self.buffers.keys()):
self.clear_buffer(item_type)
def get_buffer_size(self, item_type: str) -> int:
"""获取指定类型缓冲区的大小"""
return len(self.buffers.get(item_type, []))
def get_total_size(self) -> int:
"""获取总缓冲区大小"""
return self.total_size
def should_flush(self, item_type: str) -> bool:
"""检查是否需要刷新"""
return (
self.get_buffer_size(item_type) >= self.buffer_max_size or
self._should_flush_by_time()
)
def _should_flush_by_time(self) -> bool:
"""基于时间检查是否需要刷新"""
return time.time() - self.last_flush_time >= self.flush_interval
def _item_to_dict(self, item: Any) -> Dict[str, Any]:
"""Item转字典"""
if hasattr(item, 'items'): # 已经是字典或类似字典的对象
return dict(item)
else:
adapter = ItemAdapter(item)
return dict(adapter)
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
buffer_sizes = {k: len(v) for k, v in self.buffers.items()}
return {
**self.stats,
'buffer_sizes': buffer_sizes,
'total_buffered': self.total_size,
'buffer_types': list(self.buffers.keys())
}

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save