From 78a76ba9f24b3a55bcbdc9dea5b6ad39509b3f76 Mon Sep 17 00:00:00 2001 From: zhaoxiangpeng <1943364377@qq.com> Date: Wed, 5 Nov 2025 17:34:42 +0800 Subject: [PATCH] =?UTF-8?q?add:=E6=95=B0=E6=8D=AE=E5=85=A5=E5=BA=93?= =?UTF-8?q?=E7=AE=A1=E9=81=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../science_article_add/pipelines/__init__.py | 10 ++ .../science_article_add/pipelines/duptodo.py | 112 +++++++++++++++ .../pipelines/mongo_pipeline.py | 131 ++++++++++++++++++ .../science_article_add/pipelines/wos.py | 8 +- 4 files changed, 258 insertions(+), 3 deletions(-) create mode 100644 science_article_add/science_article_add/pipelines/duptodo.py create mode 100644 science_article_add/science_article_add/pipelines/mongo_pipeline.py diff --git a/science_article_add/science_article_add/pipelines/__init__.py b/science_article_add/science_article_add/pipelines/__init__.py index acd0b30..0361c6b 100644 --- a/science_article_add/science_article_add/pipelines/__init__.py +++ b/science_article_add/science_article_add/pipelines/__init__.py @@ -57,3 +57,13 @@ class ScienceArticleAddPipeline(ScienceAddBufferPipeline): if self.buffer_size >= self.buffer_max_size: self.buffer.clear() return item + + +class BufferPipeline: + def __init__(self, buffer_max_size: int = 100): + self.buffer = { + "type1": [], + "type2": [], + } + self.buffer_size = 0 + self.buffer_max_size = buffer_max_size diff --git a/science_article_add/science_article_add/pipelines/duptodo.py b/science_article_add/science_article_add/pipelines/duptodo.py new file mode 100644 index 0000000..83c630a --- /dev/null +++ b/science_article_add/science_article_add/pipelines/duptodo.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- +# @Time : 2025/11/3 14:16 +# @Author : zhaoxiangpeng +# @File : duptodo_pipeline.py +from __future__ import annotations +import logging +from typing import TYPE_CHECKING +from pymongo import MongoClient +from scrapy.exceptions import DropItem +from science_article_add.db_utils.buffer_component import SimpleBuffer +from science_article_add.items import IdRelationItem +from science_article_add.db_utils.mongo import MongoDBUtils + +if TYPE_CHECKING: + from pymongo.collection import Collection + +logger = logging.getLogger(__name__) + + +class DupTodoPipeline(MongoDBUtils): + def __init__(self, crawler, mongo_uri, mongo_db, buffer_max_size=None): + super().__init__(mongo_uri, mongo_db) + self.buffer = SimpleBuffer(buffer_max_size=buffer_max_size, flush_interval=10) + self.stats = crawler.stats + + @classmethod + def from_crawler(cls, crawler): + return cls( + crawler=crawler, + mongo_uri=crawler.settings.get("MONGO_URI"), + mongo_db=crawler.settings.get("MONGO_DATABASE", "items"), + buffer_max_size=crawler.settings.get("BUFFER_MAX_SIZE", 100), + ) + + def open_spider(self, spider): + self.client = MongoClient(self.mongo_uri) + self.db = self.client[self.mongo_db] + + def close_spider(self, spider): + # 爬虫结束时刷新所有数据 + for item_type in self.buffer.buffers.keys(): + if self.buffer.get_buffer_size(item_type) > 0: + self._flush_buffer(item_type, spider) + + def process_item(self, item, spider): + if isinstance(item, IdRelationItem): + # 确定Item类型 + item_type = self._get_item_type(spider) + # 添加到缓冲区 + should_flush = self.buffer.add_item(item, item_type) + # 如果需要刷新,执行插入操作 + if should_flush: + self._flush_buffer(item_type, spider) + return item + + def _flush_buffer(self, item_type: str, spider): + """刷新缓冲区到数据库""" + items = self.buffer.get_buffer(item_type) + item_count = len(items) + affect_count = 0 + try: + affect_count = item_count + dedup_items = self._contrast_dup_item(items, filter_key=self._get_dup_key(spider)) + + def gen(): + for item in dedup_items: + item = {"third_id": item.pop("third_id")} + item['state'] = 0 + yield item + + datas = [d for d in gen()] + if not datas: + raise DropItem('⏳ 已存在的item: \n[%s]' % ', '.join([x['third_id'] for x in items])) + insert_result = self._insert2db(datas, item_type) + affect_count = len(insert_result.inserted_ids) + except Exception as e: + logger.error(f"❌ 插入 %s 失败 %s 条: {e}" % (item_type, item_count)) + else: + logger.info('📝 入库 %s 行数 %s 条, 新增 %s 条, 去重 %s 条' % ( + item_type, item_count, affect_count, item_count-affect_count)) + finally: + # 清空缓冲区 + self.buffer.clear_buffer(item_type) + + def _contrast_dup_item(self, items, filter_key) -> list: + collection: Collection = self.db.get_collection(filter_key) + all_id_item_map = {item["third_id"]: item for item in items} + fingerprints = list(all_id_item_map.keys()) + # 以third_id为条件查询数据,用 exported.da 字段作为数据存在的依据,如果没有这个字段,则认为导出数据不存在 + find_results = collection.find(filter={"third_id": {"$in": fingerprints}}, + projection={"_id": 0, "third_id": 1, "exported.da": 1}) + is_exist = 0 + for document in find_results: + if document.get('exported') and ((doc_third_id := document.get('third_id')) in all_id_item_map): + is_exist += 1 + all_id_item_map.pop(doc_third_id, None) + self.inc_item_dropped_count('is_exist') + dedup_results = list(all_id_item_map.values()) + logger.info('在 %s 已存在 %s 条, 过滤后 %s 条' % (filter_key, is_exist, len(dedup_results))) + return dedup_results + + def _get_dup_key(self, spider): + return 'data_%(source_type)s_article' % {"source_type": spider.source} + + def _get_item_type(self, spider) -> str: + """获取Item类型""" + return 'todo_ids_%(source_type)s' % {"source_type": spider.source} + + def inc_item_dropped_count(self, reason): + self.stats.inc_value("item_dropped_count") + self.stats.inc_value(f"item_dropped_reasons_count/{reason}") + diff --git a/science_article_add/science_article_add/pipelines/mongo_pipeline.py b/science_article_add/science_article_add/pipelines/mongo_pipeline.py new file mode 100644 index 0000000..638979f --- /dev/null +++ b/science_article_add/science_article_add/pipelines/mongo_pipeline.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +# @Time : 2025/10/30 17:30 +# @Author : zhaoxiangpeng +# @File : mongo_pipeline.py +from __future__ import annotations +import logging +from datetime import datetime +from typing import TYPE_CHECKING, Tuple, Generator + +from pymongo import MongoClient +from pymongo.errors import BulkWriteError + +from science_article_add.db_utils.buffer_component import SimpleBuffer +from science_article_add.db_utils.mongo import MongoDBUtils, update_document +if TYPE_CHECKING: + pass + +mongo_logger = logging.getLogger('pymongo') +mongo_logger.setLevel(logging.WARNING) +logger = logging.getLogger(__name__) + + +class MongoPipeline(MongoDBUtils): + def __init__(self, mongo_uri, mongo_db, buffer_max_size=None): + super().__init__(mongo_uri, mongo_db) + self.buffer = SimpleBuffer(buffer_max_size=buffer_max_size, flush_interval=10) + + @classmethod + def from_crawler(cls, crawler): + return cls( + mongo_uri=crawler.settings.get("MONGO_URI"), + mongo_db=crawler.settings.get("MONGO_DATABASE", "items"), + buffer_max_size=crawler.settings.get("BUFFER_MAX_SIZE", 100), + ) + + def open_spider(self, spider): + self.client = MongoClient(self.mongo_uri) + self.db = self.client[self.mongo_db] + + def process_item(self, item, spider): + # 确定Item类型 + item_type = self._get_item_type(item) + + # 添加到缓冲区 + should_flush = self.buffer.add_item(item, item_type) + + # 如果需要刷新,执行插入操作 + if should_flush: + self._flush_buffer(item_type) + + return item + + def close_spider(self, spider): + # 爬虫结束时刷新所有数据 + for item_type in self.buffer.buffers.keys(): + if self.buffer.get_buffer_size(item_type) > 0: + self._flush_buffer(item_type) + + def _flush_buffer(self, item_type: str): + """刷新缓冲区到数据库""" + items = self.buffer.get_buffer(item_type) + item_count = len(items) + if not items: + return + affect_count = 0 + try: + affect_count = item_count + # 执行数据库插入 + insert_result = self._insert2db(items, tablename=item_type) + affect_count = len(insert_result.inserted_ids) + except BulkWriteError as bulk_write_e: + write_errors = bulk_write_e.details.get('writeErrors') + current_time = datetime.now() + up_time_requests = [] + errors = self._build__update(write_errors) + collection = self.db.get_collection(item_type) + for new_item in errors: + filter_query, update_query = new_item + up_result = collection.update_one(filter=filter_query, update=update_query) + affect_count -= 1 + + if up_result.matched_count == up_result.modified_count == 1: + third_id = filter_query.get("third_id") + third_id and up_time_requests.append(third_id) # 把第三方id加进去,通过第三方id更新时间 + if up_time_requests: + logger.info("修改批次时间为: %s" % current_time) + collection.update_many( + {"third_id": {"$in": up_time_requests}}, + {"$set": {"updated_at": current_time}} + ) + except Exception as e: + logger.error(f"❌ 插入失败: {e}") + # 插入失败,数据保留在缓冲区中 + finally: + # 清空缓冲区 + self.buffer.clear_buffer(item_type) + logger.info('✅ 入库 %s 行数 %s 条, 新增 %s 条, 更新 %s 条' % ( + item_type, item_count, affect_count, item_count - affect_count)) + + def _build__update(self, write_errors) -> Generator[Tuple[dict, dict], Tuple[None, None]]: + for write_error in write_errors: + update_one = None, None + if write_error.get('code') == 11000: + update_one = self._build_dup_error(write_error) + if update_one: + yield update_one + + @staticmethod + def _build_dup_error(write_error) -> tuple[None, None] | tuple[dict, dict]: + """可能被重写实现自定义的逻辑""" + original_doc = write_error.get('op') # 插入的数据 + key_pattern = write_error.get('keyPattern') + original_doc.pop("_id", None) # 删掉插入失败产生的_id + filter_query = {} + update_query = {key: val for key, val in original_doc.items() if val} + update_query.pop('updated_at', None) # 删除不确定因素时间防止影响更新的 + + for key in key_pattern.keys(): + filter_query.update({key: update_query.pop(key, None)}) + + if not update_query: + return None, None + + # 更新单条数据,根据是否变动作为条件判断是否有变化,,如果有变动,收集third_id,统一添加修改update_time + filter_query, update_query = update_document(filter_query, update_query, replace=False) + + return filter_query, update_query + + def _get_item_type(self, item) -> str: + """获取Item类型""" + return item.__class__.__tablename__ diff --git a/science_article_add/science_article_add/pipelines/wos.py b/science_article_add/science_article_add/pipelines/wos.py index 557b125..c72c670 100644 --- a/science_article_add/science_article_add/pipelines/wos.py +++ b/science_article_add/science_article_add/pipelines/wos.py @@ -1,7 +1,7 @@ # pipelines.py import pymongo from itemadapter import ItemAdapter -from science_article_add.items.wos import WosCitedNumberItem +from science_article_add.items.wos import WosCitedNumberItem, WosIdRelationItem class MongoDBPipeline: @@ -27,10 +27,12 @@ class MongoDBPipeline: adapter = ItemAdapter(item) # 根据Item类型存储到不同的集合 - if isinstance(item, WosCitedNumberItem): + if isinstance(item, WosIdRelationItem): + collection_name = 'relation_school_wos' + elif isinstance(item, WosCitedNumberItem): collection_name = 'relation_cited_number_wos' else: - collection_name = 'relation_cited_number_other' + collection_name = 'data_other' # 插入数据 self.db[collection_name].insert_one(dict(adapter))