# Define your item pipelines here # # Don't forget to add your pipeline to the ITEM_PIPELINES setting # See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html # useful for handling different item types with a single interface from __future__ import annotations import re import json import logging from datetime import datetime from typing import TYPE_CHECKING, Tuple, Union, Optional import scrapy from itemadapter import ItemAdapter from kafka import KafkaProducer from pymongo import MongoClient from pymongo.errors import ( DuplicateKeyError, BulkWriteError ) from science_article_cssci.db_utils.mongo import MongoDBUtils, build_update_query from science_article_cssci.db_utils.kafka import KafkaUtil from science_article_cssci.scripts.field_assembly import FieldAssembly if TYPE_CHECKING: from scrapy.crawler import Crawler from scrapy.statscollectors import StatsCollector from pymongo.collection import Collection mongo_logger = logging.getLogger('pymongo') mongo_logger.setLevel(logging.WARNING) logging.getLogger('kafka').setLevel(logging.WARNING) logger = logging.getLogger(__name__) class ScienceArticleCssciPipeline: def process_item(self, item, spider): return item class MongoPipeline(MongoDBUtils): def __init__(self, mongo_uri, mongo_db, stats: StatsCollector): super().__init__(mongo_uri, mongo_db) self.stats: StatsCollector = stats self.insert_failure_update_enable = True self.duplicate_cover_enable = False # 重复项覆盖 @classmethod def from_crawler(cls, crawler: Crawler): m = cls( mongo_uri=crawler.settings.get("MONGO_URI"), mongo_db=crawler.settings.get("MONGO_DATABASE", "items"), stats=crawler.stats ) return m def open_spider(self, spider): self.client = MongoClient(self.mongo_uri) self.db = self.client[self.mongo_db] def process_item(self, item, spider) -> scrapy.Item: """ 插入遇到错误不处理 """ adapter = ItemAdapter(item) tablename = self._get_item_table(item) collection = self.db.get_collection(tablename) d = adapter.asdict() try: collection.insert_one(d) self.stats.inc_value("item2db_inserted/{}".format(tablename)) except DuplicateKeyError as duplicate_error: self.stats.inc_value("item2db_duplicate/{}".format(tablename)) self.stats.inc_value(f"item_dropped_reasons_count/duplicate") except Exception: raise return item def process_item_update(self, item, spider) -> scrapy.Item: """ 插入遇到错误进行更新 """ adapter = ItemAdapter(item) tablename = self._get_item_table(item) collection = self.db.get_collection(tablename) d = adapter.asdict() try: collection.insert_one(d) self.stats.inc_value("item2db_inserted/{}".format(tablename)) except DuplicateKeyError as duplicate_error: if self.insert_failure_update_enable: write_error = duplicate_error.details filter_query, update_query = self._pick_filter_update(write_error, doc=d) updated_at_query = None # 删除不确定因素的时间防止影响更新(更新除了task_id外的字段不需要处理这个) key_pattern = write_error.get('keyPattern') key_value = write_error.get('keyValue') logger.debug("dupKey: %s, keyValue: %s", key_pattern, key_value) # 专门用来适配增量的任务 task_ids = update_query.pop("task_ids", None) if task_ids: # task_id一定会引起变动,所以先处理 task_id_query = {'task_ids': task_ids} collection.update_one(filter=filter_query, update=build_update_query(task_id_query, replace=False)) updated_at_query = {"updated_at": update_query.pop('updated_at', None)} update_q = build_update_query(update_query, replace=self.duplicate_cover_enable) up_result = collection.update_one(filter=key_value, update=update_q, upsert=True) if up_result.matched_count == up_result.modified_count == 1: # 变动了就要修改更新的时间(其实没变也要更新,这样可以知道什么时候动过这条数据) if updated_at_query: collection.update_one(filter=key_value, update={"$set": updated_at_query}) self.stats.inc_value("item2db_updated/{}".format(tablename)) except Exception: raise return item @staticmethod def _pick_filter_update(write_error, doc: dict = None): original_doc = write_error.get('op', doc) # 插入的数据 key_pattern = write_error.get('keyPattern') original_doc.pop("_id", None) # 删掉插入失败产生的_id filter_query = {} update_query = {key: val for key, val in original_doc.items() if val} for key in key_pattern.keys(): filter_query.update({key: update_query.pop(key, None)}) return filter_query, update_query def close_spider(self, spider): self.client.close() @staticmethod def _get_item_table(item) -> str: """获取Item类型""" if hasattr(item, '__tablename__'): return item.__class__.__tablename__ return 'items_null_table' class Mongo2SciencePipeline(MongoPipeline): def process_item(self, item, spider): d = self.parse2science(item) table = self._get_item_table(item) coll = self.db.get_collection(table) return d @staticmethod def parse2science(item) -> dict: def change_qi(tmp_str): return re.sub(r'^0|0$', '', tmp_str) def change_string(tmp_str): tmp = tmp_str.split("aaa") tmp_z = [] for t in tmp: if len(t) > 1: tmp_z.append(t) return tmp_z adapter = ItemAdapter(item) third_id = adapter['third_id'] resp_raw = adapter['resp_raw'] resp_raw = json.loads(resp_raw) authors: list = resp_raw['author'] authors = [dict(zzmc=au_info['zzmc'], jgmc=au_info['jgmc'], bmmc=au_info['bmmc']) for au_info in authors or []] # 作者.学校.院系 # pprint(authors) catations: list = resp_raw.get('catation') contents: list = resp_raw.get('contents', []) body = [c for c in contents if c.get("sno") == adapter['third_id']] if body: content = body[0] else: content = {} d = dict( sno=third_id, lypm=content['lypm'], # 篇名 blpm=content['blpm'], # 英文篇名 zzjg=authors, # 作者和机构 wzlx=content['wzlx'], # 文献类型 xkfl1=content['xkfl1'], # 学科类别1 xkfl2=content.get('xkfl2', ''), # 学科类别2 xkdm1=content['xkdm1'], # 中图类号1 xkdm2=content.get('xkdm2', ''), # 中图类号2 xmlb=content.get('xmlb', ''), # 基金项目 qkmc=content.get('qkmc', ''), # 来源期刊 # (nian)年(juan)卷第(qi).replace(/^0|0$/g,'')期:(ym) nian=content.get('nian', ''), # 年 juan=content.get('juan', ''), # 卷 qi=content.get('qi', ''), # 期 ym=content.get('ym', ''), # 页码 byc=change_string(content.get('byc', '')), # 关键词 ckwx=catations # 参考文献 ) return d class BuildDetailPipeline: def process_item(self, item, spider): adapter = ItemAdapter(item) item['detailed'] = self.build_detailed(adapter) return item @staticmethod def build_detailed(item): resp_raw = item.get("resp_raw") dd = dict( **FieldAssembly.parse_detail( content=resp_raw.get('content'), author=resp_raw.get('author'), catation=resp_raw.get('catation'), ) ) return dd class KafkaPipeline: def __init__(self, kafka_servers, topic): self.kafka_servers = kafka_servers self.topic = topic self.producer: KafkaProducer = None @classmethod def from_crawler(cls, crawler): return cls( kafka_servers=crawler.settings.get('KAFKA_SERVERS', 'localhost:9092'), topic=crawler.settings.get('KAFKA_TOPIC', 'scrapy_items') ) def open_spider(self, spider): self.producer: KafkaProducer = KafkaProducer( bootstrap_servers=self.kafka_servers, value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'), acks='all', linger_ms=50, # 等待时间,最多等待 50ms 封包发送 compression_type='gzip', ) spider.logger.info(f"Connected to Kafka at {self.kafka_servers}") def close_spider(self, spider): if self.producer: self.producer.flush() self.producer.close() spider.logger.info("Kafka connection closed") def process_item(self, item, spider): adapter = ItemAdapter(item) d = adapter.asdict() d = self.build2kafka(d) # 发送到Kafka future = self.producer.send( topic=self.topic, value=d, headers=[('source_type', b'cssci')] ) future.add_callback(self.on_send_success) future.add_callback(self.on_send_success) return item def on_send_success(self, record_metadata): """发送成功回调""" pass def on_send_error(self, excp): """发送失败回调""" pass def build2kafka(self, item: dict) -> dict: dd = dict( id=item.get("third_id"), school_id="999", **item.get('detailed'), updated_time="2025-11-01 09:01:56" ) dd.pop("references", None) return dd