add:数据入库管道

main
zhaoxiangpeng 2 months ago
parent 1c2fd3c988
commit 78a76ba9f2

@ -57,3 +57,13 @@ class ScienceArticleAddPipeline(ScienceAddBufferPipeline):
if self.buffer_size >= self.buffer_max_size:
self.buffer.clear()
return item
class BufferPipeline:
def __init__(self, buffer_max_size: int = 100):
self.buffer = {
"type1": [],
"type2": [],
}
self.buffer_size = 0
self.buffer_max_size = buffer_max_size

@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
# @Time : 2025/11/3 14:16
# @Author : zhaoxiangpeng
# @File : duptodo_pipeline.py
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from pymongo import MongoClient
from scrapy.exceptions import DropItem
from science_article_add.db_utils.buffer_component import SimpleBuffer
from science_article_add.items import IdRelationItem
from science_article_add.db_utils.mongo import MongoDBUtils
if TYPE_CHECKING:
from pymongo.collection import Collection
logger = logging.getLogger(__name__)
class DupTodoPipeline(MongoDBUtils):
def __init__(self, crawler, mongo_uri, mongo_db, buffer_max_size=None):
super().__init__(mongo_uri, mongo_db)
self.buffer = SimpleBuffer(buffer_max_size=buffer_max_size, flush_interval=10)
self.stats = crawler.stats
@classmethod
def from_crawler(cls, crawler):
return cls(
crawler=crawler,
mongo_uri=crawler.settings.get("MONGO_URI"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
buffer_max_size=crawler.settings.get("BUFFER_MAX_SIZE", 100),
)
def open_spider(self, spider):
self.client = MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
# 爬虫结束时刷新所有数据
for item_type in self.buffer.buffers.keys():
if self.buffer.get_buffer_size(item_type) > 0:
self._flush_buffer(item_type, spider)
def process_item(self, item, spider):
if isinstance(item, IdRelationItem):
# 确定Item类型
item_type = self._get_item_type(spider)
# 添加到缓冲区
should_flush = self.buffer.add_item(item, item_type)
# 如果需要刷新,执行插入操作
if should_flush:
self._flush_buffer(item_type, spider)
return item
def _flush_buffer(self, item_type: str, spider):
"""刷新缓冲区到数据库"""
items = self.buffer.get_buffer(item_type)
item_count = len(items)
affect_count = 0
try:
affect_count = item_count
dedup_items = self._contrast_dup_item(items, filter_key=self._get_dup_key(spider))
def gen():
for item in dedup_items:
item = {"third_id": item.pop("third_id")}
item['state'] = 0
yield item
datas = [d for d in gen()]
if not datas:
raise DropItem('⏳ 已存在的item: \n[%s]' % ', '.join([x['third_id'] for x in items]))
insert_result = self._insert2db(datas, item_type)
affect_count = len(insert_result.inserted_ids)
except Exception as e:
logger.error(f"❌ 插入 %s 失败 %s 条: {e}" % (item_type, item_count))
else:
logger.info('📝 入库 %s 行数 %s 条, 新增 %s 条, 去重 %s' % (
item_type, item_count, affect_count, item_count-affect_count))
finally:
# 清空缓冲区
self.buffer.clear_buffer(item_type)
def _contrast_dup_item(self, items, filter_key) -> list:
collection: Collection = self.db.get_collection(filter_key)
all_id_item_map = {item["third_id"]: item for item in items}
fingerprints = list(all_id_item_map.keys())
# 以third_id为条件查询数据用 exported.da 字段作为数据存在的依据,如果没有这个字段,则认为导出数据不存在
find_results = collection.find(filter={"third_id": {"$in": fingerprints}},
projection={"_id": 0, "third_id": 1, "exported.da": 1})
is_exist = 0
for document in find_results:
if document.get('exported') and ((doc_third_id := document.get('third_id')) in all_id_item_map):
is_exist += 1
all_id_item_map.pop(doc_third_id, None)
self.inc_item_dropped_count('is_exist')
dedup_results = list(all_id_item_map.values())
logger.info('%s 已存在 %s 条, 过滤后 %s' % (filter_key, is_exist, len(dedup_results)))
return dedup_results
def _get_dup_key(self, spider):
return 'data_%(source_type)s_article' % {"source_type": spider.source}
def _get_item_type(self, spider) -> str:
"""获取Item类型"""
return 'todo_ids_%(source_type)s' % {"source_type": spider.source}
def inc_item_dropped_count(self, reason):
self.stats.inc_value("item_dropped_count")
self.stats.inc_value(f"item_dropped_reasons_count/{reason}")

@ -0,0 +1,131 @@
# -*- coding: utf-8 -*-
# @Time : 2025/10/30 17:30
# @Author : zhaoxiangpeng
# @File : mongo_pipeline.py
from __future__ import annotations
import logging
from datetime import datetime
from typing import TYPE_CHECKING, Tuple, Generator
from pymongo import MongoClient
from pymongo.errors import BulkWriteError
from science_article_add.db_utils.buffer_component import SimpleBuffer
from science_article_add.db_utils.mongo import MongoDBUtils, update_document
if TYPE_CHECKING:
pass
mongo_logger = logging.getLogger('pymongo')
mongo_logger.setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
class MongoPipeline(MongoDBUtils):
def __init__(self, mongo_uri, mongo_db, buffer_max_size=None):
super().__init__(mongo_uri, mongo_db)
self.buffer = SimpleBuffer(buffer_max_size=buffer_max_size, flush_interval=10)
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get("MONGO_URI"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
buffer_max_size=crawler.settings.get("BUFFER_MAX_SIZE", 100),
)
def open_spider(self, spider):
self.client = MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def process_item(self, item, spider):
# 确定Item类型
item_type = self._get_item_type(item)
# 添加到缓冲区
should_flush = self.buffer.add_item(item, item_type)
# 如果需要刷新,执行插入操作
if should_flush:
self._flush_buffer(item_type)
return item
def close_spider(self, spider):
# 爬虫结束时刷新所有数据
for item_type in self.buffer.buffers.keys():
if self.buffer.get_buffer_size(item_type) > 0:
self._flush_buffer(item_type)
def _flush_buffer(self, item_type: str):
"""刷新缓冲区到数据库"""
items = self.buffer.get_buffer(item_type)
item_count = len(items)
if not items:
return
affect_count = 0
try:
affect_count = item_count
# 执行数据库插入
insert_result = self._insert2db(items, tablename=item_type)
affect_count = len(insert_result.inserted_ids)
except BulkWriteError as bulk_write_e:
write_errors = bulk_write_e.details.get('writeErrors')
current_time = datetime.now()
up_time_requests = []
errors = self._build__update(write_errors)
collection = self.db.get_collection(item_type)
for new_item in errors:
filter_query, update_query = new_item
up_result = collection.update_one(filter=filter_query, update=update_query)
affect_count -= 1
if up_result.matched_count == up_result.modified_count == 1:
third_id = filter_query.get("third_id")
third_id and up_time_requests.append(third_id) # 把第三方id加进去通过第三方id更新时间
if up_time_requests:
logger.info("修改批次时间为: %s" % current_time)
collection.update_many(
{"third_id": {"$in": up_time_requests}},
{"$set": {"updated_at": current_time}}
)
except Exception as e:
logger.error(f"❌ 插入失败: {e}")
# 插入失败,数据保留在缓冲区中
finally:
# 清空缓冲区
self.buffer.clear_buffer(item_type)
logger.info('✅ 入库 %s 行数 %s 条, 新增 %s 条, 更新 %s' % (
item_type, item_count, affect_count, item_count - affect_count))
def _build__update(self, write_errors) -> Generator[Tuple[dict, dict], Tuple[None, None]]:
for write_error in write_errors:
update_one = None, None
if write_error.get('code') == 11000:
update_one = self._build_dup_error(write_error)
if update_one:
yield update_one
@staticmethod
def _build_dup_error(write_error) -> tuple[None, None] | tuple[dict, dict]:
"""可能被重写实现自定义的逻辑"""
original_doc = write_error.get('op') # 插入的数据
key_pattern = write_error.get('keyPattern')
original_doc.pop("_id", None) # 删掉插入失败产生的_id
filter_query = {}
update_query = {key: val for key, val in original_doc.items() if val}
update_query.pop('updated_at', None) # 删除不确定因素时间防止影响更新的
for key in key_pattern.keys():
filter_query.update({key: update_query.pop(key, None)})
if not update_query:
return None, None
# 更新单条数据根据是否变动作为条件判断是否有变化如果有变动收集third_id统一添加修改update_time
filter_query, update_query = update_document(filter_query, update_query, replace=False)
return filter_query, update_query
def _get_item_type(self, item) -> str:
"""获取Item类型"""
return item.__class__.__tablename__

@ -1,7 +1,7 @@
# pipelines.py
import pymongo
from itemadapter import ItemAdapter
from science_article_add.items.wos import WosCitedNumberItem
from science_article_add.items.wos import WosCitedNumberItem, WosIdRelationItem
class MongoDBPipeline:
@ -27,10 +27,12 @@ class MongoDBPipeline:
adapter = ItemAdapter(item)
# 根据Item类型存储到不同的集合
if isinstance(item, WosCitedNumberItem):
if isinstance(item, WosIdRelationItem):
collection_name = 'relation_school_wos'
elif isinstance(item, WosCitedNumberItem):
collection_name = 'relation_cited_number_wos'
else:
collection_name = 'relation_cited_number_other'
collection_name = 'data_other'
# 插入数据
self.db[collection_name].insert_one(dict(adapter))

Loading…
Cancel
Save