You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

187 lines
7.0 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
# @Time : 2025/10/30 17:30
# @Author : zhaoxiangpeng
# @File : mongo_pipeline.py
from __future__ import annotations
import logging
from datetime import datetime
from typing import TYPE_CHECKING, Tuple, Generator
from pymongo import MongoClient
from itemadapter import ItemAdapter
from pymongo.errors import (
DuplicateKeyError,
BulkWriteError
)
from science_article_add.db_utils.buffer_component import SimpleBuffer
from science_article_add.db_utils.mongo import MongoDBUtils, update_document
if TYPE_CHECKING:
from scrapy.crawler import Crawler
from scrapy.statscollectors import StatsCollector
mongo_logger = logging.getLogger('pymongo')
mongo_logger.setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
class MongoPipeline(MongoDBUtils):
def __init__(self, mongo_uri, mongo_db, stats: StatsCollector):
super().__init__(mongo_uri, mongo_db)
self.stats: StatsCollector = stats
self.insert_failure_update_enable = True
@classmethod
def from_crawler(cls, crawler: Crawler):
return cls(
mongo_uri=crawler.settings.get("MONGO_URI"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
stats=crawler.stats
)
def open_spider(self, spider):
self.client = MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def process_item(self, item, spider):
# 确定Item类型
adapter = ItemAdapter(item)
item_type = self._get_item_type(item)
collection = self.db.get_collection(item_type)
d = adapter.asdict()
try:
insert_result = collection.insert_one(d)
except DuplicateKeyError as duplicate_error:
if self.insert_failure_update_enable:
write_error = duplicate_error.details
key_pattern = write_error.get('keyPattern')
key_value = write_error.get('keyValue')
logger.debug("dupKey: %s, keyValue: %s", key_pattern, key_value)
[d.pop(k, None) for k in key_pattern.keys()]
up_result = collection.update_one(filter=key_value, update={"$set": d}, upsert=True)
except Exception:
raise
return item
def close_spider(self, spider):
self.client.close()
@staticmethod
def _get_item_type(item) -> str:
"""获取Item类型"""
if hasattr(item, '__tablename__'):
return item.item_type
return 'items_null_table'
class MongoPipelineMulti(MongoDBUtils):
def __init__(self, mongo_uri, mongo_db, buffer_max_size=None):
super().__init__(mongo_uri, mongo_db)
self.buffer = SimpleBuffer(buffer_max_size=buffer_max_size, flush_interval=10)
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get("MONGO_URI"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
buffer_max_size=crawler.settings.get("BUFFER_MAX_SIZE", 100),
)
def open_spider(self, spider):
self.client = MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def process_item(self, item, spider):
# 确定Item类型
item_type = self._get_item_type(item)
# 添加到缓冲区
should_flush = self.buffer.add_item(item, item_type)
# 如果需要刷新,执行插入操作
if should_flush:
self._flush_buffer(item_type)
return item
def close_spider(self, spider):
# 爬虫结束时刷新所有数据
for item_type in self.buffer.buffers.keys():
if self.buffer.get_buffer_size(item_type) > 0:
self._flush_buffer(item_type)
def _flush_buffer(self, item_type: str):
"""刷新缓冲区到数据库"""
items = self.buffer.get_buffer(item_type)
item_count = len(items)
if not items:
return
affect_count = 0
try:
affect_count = item_count
# 执行数据库插入
insert_result = self._insert2db(items, tablename=item_type)
affect_count = len(insert_result.inserted_ids)
except BulkWriteError as bulk_write_e:
write_errors = bulk_write_e.details.get('writeErrors')
current_time = datetime.now()
up_time_requests = []
errors = self._build__update(write_errors)
collection = self.db.get_collection(item_type)
for new_item in errors:
filter_query, update_query = new_item
up_result = collection.update_one(filter=filter_query, update=update_query)
affect_count -= 1
if up_result.matched_count == up_result.modified_count == 1:
third_id = filter_query.get("third_id")
third_id and up_time_requests.append(third_id) # 把第三方id加进去通过第三方id更新时间
if up_time_requests:
logger.info("修改批次时间为: %s" % current_time)
collection.update_many(
{"third_id": {"$in": up_time_requests}},
{"$set": {"updated_at": current_time}}
)
except Exception as e:
logger.error(f"❌ 插入失败: {e}")
# 插入失败,数据保留在缓冲区中
finally:
# 清空缓冲区
self.buffer.clear_buffer(item_type)
logger.info('✅ 入库 %s 行数 %s 条, 新增 %s 条, 更新 %s' % (
item_type, item_count, affect_count, item_count - affect_count))
def _build__update(self, write_errors) -> Generator[Tuple[dict, dict], Tuple[None, None]]:
for write_error in write_errors:
update_one = None, None
if write_error.get('code') == 11000:
update_one = self._build_dup_error(write_error)
if update_one:
yield update_one
@staticmethod
def _build_dup_error(write_error) -> tuple[None, None] | tuple[dict, dict]:
"""可能被重写实现自定义的逻辑"""
original_doc = write_error.get('op') # 插入的数据
key_pattern = write_error.get('keyPattern')
original_doc.pop("_id", None) # 删掉插入失败产生的_id
filter_query = {}
update_query = {key: val for key, val in original_doc.items() if val}
update_query.pop('updated_at', None) # 删除不确定因素时间防止影响更新的
for key in key_pattern.keys():
filter_query.update({key: update_query.pop(key, None)})
if not update_query:
return None, None
# 更新单条数据根据是否变动作为条件判断是否有变化如果有变动收集third_id统一添加修改update_time
filter_query, update_query = update_document(filter_query, update_query, replace=False)
return filter_query, update_query
def _get_item_type(self, item) -> str:
"""获取Item类型"""
return item.__class__.__tablename__