diff --git a/science_article_add/science_article_add/pipelines/verify_data.py b/science_article_add/science_article_add/pipelines/verify_data.py new file mode 100644 index 0000000..b8cfdd6 --- /dev/null +++ b/science_article_add/science_article_add/pipelines/verify_data.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# @Time : 2025/12/2 13:34 +# @Author : zhaoxiangpeng +# @File : verify_data.py +import logging +from itemadapter import ItemAdapter +from pymongo import MongoClient + +from science_article_add.items import ArticleItem + + +class VerifyDataIntegrity: + def __init__(self, mongo_uri, mongo_db): + self.successful_delete = False + self.batch_ids = set() + self.successful = [] + self.logger = logging.getLogger(__name__) + + self.mongo_uri = mongo_uri + self.mongo_db = mongo_db + self.client: MongoClient = None + self.db = None + + @classmethod + def from_crawler(cls, crawler): + settings = crawler.settings + c = cls( + mongo_uri=crawler.settings.get("MONGO_URI"), + mongo_db=crawler.settings.get("MONGO_DATABASE", "items"), + ) + return c + + def init_db(self): + self.client = MongoClient(self.mongo_uri) + self.db = self.client[self.mongo_db] + + def open_spider(self, spider): + spider_batch_ids = spider.get_batch_ids() + for batch in spider_batch_ids: + if batch.get("field", "UT") == "UT": + self.batch_ids.add(batch.get("third_id")) + self.init_db() + + def process_item(self, item, spider): + adapter = ItemAdapter(item) + if isinstance(item, ArticleItem): + unique_id = adapter.get("third_id") + self.successful.append(unique_id) + if self.successful_delete: + self.batch_ids.discard(unique_id) + return item + + def close_spider(self, spider): + failure = self.batch_ids - set(self.successful) + coll = self.db.get_collection("todo_ids_wos") + if self.successful: + if self.successful_delete: + coll.delete_many(filter={"third_id": {"$in": self.successful}}) + self.logger.info("Successfully deleted %d articles", len(self.successful)) + else: + coll.update_many(filter={"third_id": {"$in": self.successful}}, update={"$set": {"state": 1}}) + self.logger.info("Successfully updated %d articles", len(self.successful)) + if failure: + self.logger.warning("未下载到: %s" % list(failure)) + coll.update_many(filter={"third_id": {"$in": list(failure)}}, update={"$set": {"state": -1}}) + else: + self.logger.info("Successfully verified: %s" % "下载完整无异常")