From 5d44e5fe86a084457238b2d2c4805f816a5e60bf Mon Sep 17 00:00:00 2001 From: zhaoxiangpeng <1943364377@qq.com> Date: Tue, 2 Dec 2025 13:52:21 +0800 Subject: [PATCH] =?UTF-8?q?change:=E5=A2=9E=E5=8A=A0=E5=8D=95=E6=9D=A1?= =?UTF-8?q?=E5=85=A5=E5=BA=93pipeline?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pipelines/{mongo_pipeline.py => mongo.py} | 59 ++++++++++++++++++- .../spiders/wos_latest_increment.py | 2 +- 2 files changed, 58 insertions(+), 3 deletions(-) rename science_article_add/science_article_add/pipelines/{mongo_pipeline.py => mongo.py} (72%) diff --git a/science_article_add/science_article_add/pipelines/mongo_pipeline.py b/science_article_add/science_article_add/pipelines/mongo.py similarity index 72% rename from science_article_add/science_article_add/pipelines/mongo_pipeline.py rename to science_article_add/science_article_add/pipelines/mongo.py index 638979f..d313b56 100644 --- a/science_article_add/science_article_add/pipelines/mongo_pipeline.py +++ b/science_article_add/science_article_add/pipelines/mongo.py @@ -8,12 +8,17 @@ from datetime import datetime from typing import TYPE_CHECKING, Tuple, Generator from pymongo import MongoClient -from pymongo.errors import BulkWriteError +from itemadapter import ItemAdapter +from pymongo.errors import ( + DuplicateKeyError, + BulkWriteError +) from science_article_add.db_utils.buffer_component import SimpleBuffer from science_article_add.db_utils.mongo import MongoDBUtils, update_document if TYPE_CHECKING: - pass + from scrapy.crawler import Crawler + from scrapy.statscollectors import StatsCollector mongo_logger = logging.getLogger('pymongo') mongo_logger.setLevel(logging.WARNING) @@ -21,6 +26,56 @@ logger = logging.getLogger(__name__) class MongoPipeline(MongoDBUtils): + def __init__(self, mongo_uri, mongo_db, stats: StatsCollector): + super().__init__(mongo_uri, mongo_db) + self.stats: StatsCollector = stats + self.insert_failure_update_enable = True + + @classmethod + def from_crawler(cls, crawler: Crawler): + return cls( + mongo_uri=crawler.settings.get("MONGO_URI"), + mongo_db=crawler.settings.get("MONGO_DATABASE", "items"), + stats=crawler.stats + ) + + def open_spider(self, spider): + self.client = MongoClient(self.mongo_uri) + self.db = self.client[self.mongo_db] + + def process_item(self, item, spider): + # 确定Item类型 + adapter = ItemAdapter(item) + item_type = self._get_item_type(item) + collection = self.db.get_collection(item_type) + d = adapter.asdict() + try: + insert_result = collection.insert_one(d) + except DuplicateKeyError as duplicate_error: + if self.insert_failure_update_enable: + write_error = duplicate_error.details + key_pattern = write_error.get('keyPattern') + key_value = write_error.get('keyValue') + logger.debug("dupKey: %s, keyValue: %s", key_pattern, key_value) + [d.pop(k, None) for k in key_pattern.keys()] + up_result = collection.update_one(filter=key_value, update={"$set": d}, upsert=True) + except Exception: + raise + + return item + + def close_spider(self, spider): + self.client.close() + + @staticmethod + def _get_item_type(item) -> str: + """获取Item类型""" + if hasattr(item, '__tablename__'): + return item.item_type + return 'items_null_table' + + +class MongoPipelineMulti(MongoDBUtils): def __init__(self, mongo_uri, mongo_db, buffer_max_size=None): super().__init__(mongo_uri, mongo_db) self.buffer = SimpleBuffer(buffer_max_size=buffer_max_size, flush_interval=10) diff --git a/science_article_add/science_article_add/spiders/wos_latest_increment.py b/science_article_add/science_article_add/spiders/wos_latest_increment.py index 8900c4c..ca2e969 100644 --- a/science_article_add/science_article_add/spiders/wos_latest_increment.py +++ b/science_article_add/science_article_add/spiders/wos_latest_increment.py @@ -25,7 +25,7 @@ class WosLatestIncrementSpider(scrapy.Spider): "science_article_add.middlewares.wos.WosStarterApiXkeyDownloaderMiddleware": 500 }, ITEM_PIPELINES={ - "science_article_add.pipelines.mongo_pipeline.MongoPipeline": 300, + "science_article_add.pipelines.mongo.MongoPipelineMulti": 300, "science_article_add.pipelines.duptodo.DupTodoPipeline": 400, }, LOG_LEVEL="INFO"